1use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14 CacheKey, CacheMeta, ForcedFreshness, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::inference::{
23 extract_inference_content, is_sse_response, PromptInjectionResult, StreamingTokenCounter,
24};
25use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
26use crate::rate_limit::HeaderAccessor;
27use crate::routing::RequestInfo;
28
29use super::context::{FallbackReason, RequestContext};
30use super::fallback::FallbackEvaluator;
31use super::fallback_metrics::get_fallback_metrics;
32use super::model_routing;
33use super::model_routing_metrics::get_model_routing_metrics;
34use super::SentinelProxy;
35
36struct NoHeaderAccessor;
38impl HeaderAccessor for NoHeaderAccessor {
39 fn get_header(&self, _name: &str) -> Option<String> {
40 None
41 }
42}
43
44#[async_trait]
45impl ProxyHttp for SentinelProxy {
46 type CTX = RequestContext;
47
48 fn new_ctx(&self) -> Self::CTX {
49 RequestContext::new()
50 }
51
52 fn fail_to_connect(
53 &self,
54 _session: &mut Session,
55 peer: &HttpPeer,
56 ctx: &mut Self::CTX,
57 e: Box<Error>,
58 ) -> Box<Error> {
59 error!(
60 correlation_id = %ctx.trace_id,
61 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
62 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
63 peer_address = %peer.address(),
64 error = %e,
65 "Failed to connect to upstream peer"
66 );
67 e
69 }
70
71 async fn early_request_filter(
74 &self,
75 session: &mut Session,
76 ctx: &mut Self::CTX,
77 ) -> Result<(), Box<Error>> {
78 let req_header = session.req_header();
80 let method = req_header.method.as_str();
81 let path = req_header.uri.path();
82 let host = req_header
83 .headers
84 .get("host")
85 .and_then(|h| h.to_str().ok())
86 .unwrap_or("");
87
88 if let Some(ref challenge_manager) = self.acme_challenges {
90 if let Some(token) = crate::acme::ChallengeManager::extract_token(path) {
91 if let Some(key_authorization) = challenge_manager.get_response(token) {
92 debug!(
93 token = %token,
94 "Serving ACME HTTP-01 challenge response"
95 );
96
97 let mut resp = ResponseHeader::build(200, None)?;
99 resp.insert_header("Content-Type", "text/plain")?;
100 resp.insert_header("Content-Length", key_authorization.len().to_string())?;
101
102 session
104 .write_response_header(Box::new(resp), false)
105 .await?;
106 session
107 .write_response_body(Some(Bytes::from(key_authorization)), true)
108 .await?;
109
110 return Err(Error::explain(
112 ErrorType::InternalError,
113 "ACME challenge served",
114 ));
115 } else {
116 warn!(
118 token = %token,
119 "ACME challenge token not found"
120 );
121 }
122 }
123 }
124
125 ctx.method = method.to_string();
126 ctx.path = path.to_string();
127 ctx.host = Some(host.to_string());
128
129 let route_match = {
131 let route_matcher = self.route_matcher.read();
132 let request_info = RequestInfo::new(method, path, host);
133 match route_matcher.match_request(&request_info) {
134 Some(m) => m,
135 None => return Ok(()), }
137 };
138
139 ctx.trace_id = self.get_trace_id(session);
140 ctx.route_id = Some(route_match.route_id.to_string());
141 ctx.route_config = Some(route_match.config.clone());
142
143 if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
145 if let Ok(s) = traceparent.to_str() {
146 ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
147 }
148 }
149
150 if let Some(tracer) = crate::otel::get_tracer() {
152 ctx.otel_span = Some(tracer.start_span(method, path, ctx.trace_context.as_ref()));
153 }
154
155 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
157 trace!(
158 correlation_id = %ctx.trace_id,
159 route_id = %route_match.route_id,
160 builtin_handler = ?route_match.config.builtin_handler,
161 "Handling builtin route in early_request_filter"
162 );
163
164 let handled = self
166 .handle_builtin_route(session, ctx, &route_match)
167 .await?;
168
169 if handled {
170 return Err(Error::explain(
172 ErrorType::InternalError,
173 "Builtin handler complete",
174 ));
175 }
176 }
177
178 Ok(())
179 }
180
181 async fn upstream_peer(
182 &self,
183 session: &mut Session,
184 ctx: &mut Self::CTX,
185 ) -> Result<Box<HttpPeer>, Box<Error>> {
186 self.reload_coordinator.inc_requests();
188
189 if ctx.config.is_none() {
191 ctx.config = Some(self.config_manager.current());
192 }
193
194 if ctx.client_ip.is_empty() {
196 ctx.client_ip = session
197 .client_addr()
198 .map(|a| a.to_string())
199 .unwrap_or_else(|| "unknown".to_string());
200 }
201
202 let req_header = session.req_header();
203
204 if ctx.method.is_empty() {
206 ctx.method = req_header.method.to_string();
207 ctx.path = req_header.uri.path().to_string();
208 ctx.query = req_header.uri.query().map(|q| q.to_string());
209 ctx.host = req_header
210 .headers
211 .get("host")
212 .and_then(|v| v.to_str().ok())
213 .map(|s| s.to_string());
214 }
215 ctx.user_agent = req_header
216 .headers
217 .get("user-agent")
218 .and_then(|v| v.to_str().ok())
219 .map(|s| s.to_string());
220 ctx.referer = req_header
221 .headers
222 .get("referer")
223 .and_then(|v| v.to_str().ok())
224 .map(|s| s.to_string());
225
226 trace!(
227 correlation_id = %ctx.trace_id,
228 client_ip = %ctx.client_ip,
229 "Request received, initializing context"
230 );
231
232 let route_match = if let Some(ref route_config) = ctx.route_config {
234 let route_id = ctx.route_id.as_deref().unwrap_or("");
235 crate::routing::RouteMatch {
236 route_id: sentinel_common::RouteId::new(route_id),
237 config: route_config.clone(),
238 }
239 } else {
240 let (match_result, route_duration) = {
242 let route_matcher = self.route_matcher.read();
243 let host = ctx.host.as_deref().unwrap_or("");
244
245 let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
247
248 if route_matcher.needs_headers() {
250 request_info = request_info
251 .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
252 }
253
254 if route_matcher.needs_query_params() {
256 request_info =
257 request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
258 }
259
260 trace!(
261 correlation_id = %ctx.trace_id,
262 method = %request_info.method,
263 path = %request_info.path,
264 host = %request_info.host,
265 "Built request info for route matching"
266 );
267
268 let route_start = std::time::Instant::now();
269 let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
270 warn!(
271 correlation_id = %ctx.trace_id,
272 method = %request_info.method,
273 path = %request_info.path,
274 host = %request_info.host,
275 "No matching route found for request"
276 );
277 Error::explain(ErrorType::InternalError, "No matching route found")
278 })?;
279 let route_duration = route_start.elapsed();
280 (route_match, route_duration)
282 };
283
284 ctx.route_id = Some(match_result.route_id.to_string());
285 ctx.route_config = Some(match_result.config.clone());
286
287 if ctx.trace_id.is_empty() {
289 ctx.trace_id = self.get_trace_id(session);
290
291 if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER)
293 {
294 if let Ok(s) = traceparent.to_str() {
295 ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
296 }
297 }
298
299 if let Some(tracer) = crate::otel::get_tracer() {
301 ctx.otel_span = Some(tracer.start_span(
302 &ctx.method,
303 &ctx.path,
304 ctx.trace_context.as_ref(),
305 ));
306 }
307 }
308
309 trace!(
310 correlation_id = %ctx.trace_id,
311 route_id = %match_result.route_id,
312 route_duration_us = route_duration.as_micros(),
313 service_type = ?match_result.config.service_type,
314 "Route matched"
315 );
316 match_result
317 };
318
319 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
321 trace!(
322 correlation_id = %ctx.trace_id,
323 route_id = %route_match.route_id,
324 builtin_handler = ?route_match.config.builtin_handler,
325 "Route type is builtin, skipping upstream"
326 );
327 ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
329 return Err(Error::explain(
331 ErrorType::InternalError,
332 "Builtin handler handled in request_filter",
333 ));
334 }
335
336 if route_match.config.service_type == sentinel_config::ServiceType::Static {
338 trace!(
339 correlation_id = %ctx.trace_id,
340 route_id = %route_match.route_id,
341 "Route type is static, checking for static server"
342 );
343 if self
345 .static_servers
346 .get(route_match.route_id.as_str())
347 .await
348 .is_some()
349 {
350 ctx.upstream = Some(format!("_static_{}", route_match.route_id));
352 info!(
353 correlation_id = %ctx.trace_id,
354 route_id = %route_match.route_id,
355 path = %ctx.path,
356 "Serving static file"
357 );
358 return Err(Error::explain(
360 ErrorType::InternalError,
361 "Static file serving handled in request_filter",
362 ));
363 }
364 }
365
366 let mut model_routing_applied = false;
369 if let Some(ref inference) = route_match.config.inference {
370 if let Some(ref model_routing) = inference.model_routing {
371 let model = model_routing::extract_model_from_headers(&req_header.headers);
373
374 if let Some(ref model_name) = model {
375 if let Some(routing_result) =
377 model_routing::find_upstream_for_model(model_routing, model_name)
378 {
379 debug!(
380 correlation_id = %ctx.trace_id,
381 route_id = %route_match.route_id,
382 model = %model_name,
383 upstream = %routing_result.upstream,
384 is_default = routing_result.is_default,
385 provider_override = ?routing_result.provider,
386 "Model-based routing selected upstream"
387 );
388
389 ctx.record_model_routing(
390 &routing_result.upstream,
391 Some(model_name.clone()),
392 routing_result.provider,
393 );
394 model_routing_applied = true;
395
396 if let Some(metrics) = get_model_routing_metrics() {
398 metrics.record_model_routed(
399 route_match.route_id.as_str(),
400 model_name,
401 &routing_result.upstream,
402 );
403 if routing_result.is_default {
404 metrics.record_default_upstream(route_match.route_id.as_str());
405 }
406 if let Some(provider) = routing_result.provider {
407 metrics.record_provider_override(
408 route_match.route_id.as_str(),
409 &routing_result.upstream,
410 provider.as_str(),
411 );
412 }
413 }
414 }
415 } else if let Some(ref default_upstream) = model_routing.default_upstream {
416 debug!(
418 correlation_id = %ctx.trace_id,
419 route_id = %route_match.route_id,
420 upstream = %default_upstream,
421 "Model-based routing using default upstream (no model header)"
422 );
423 ctx.record_model_routing(default_upstream, None, None);
424 model_routing_applied = true;
425
426 if let Some(metrics) = get_model_routing_metrics() {
428 metrics.record_no_model_header(route_match.route_id.as_str());
429 }
430 }
431 }
432 }
433
434 if !model_routing_applied {
436 if let Some(ref upstream) = route_match.config.upstream {
437 ctx.upstream = Some(upstream.clone());
438 trace!(
439 correlation_id = %ctx.trace_id,
440 route_id = %route_match.route_id,
441 upstream = %upstream,
442 "Upstream configured for route"
443 );
444 } else {
445 error!(
446 correlation_id = %ctx.trace_id,
447 route_id = %route_match.route_id,
448 "Route has no upstream configured"
449 );
450 return Err(Error::explain(
451 ErrorType::InternalError,
452 format!(
453 "Route '{}' has no upstream configured",
454 route_match.route_id
455 ),
456 ));
457 }
458 }
459
460 if let Some(ref fallback_config) = route_match.config.fallback {
463 let upstream_name = ctx.upstream.as_ref().unwrap();
464
465 let is_healthy = if let Some(pool) = self.upstream_pools.get(upstream_name).await {
467 pool.has_healthy_targets().await
468 } else {
469 false };
471
472 let is_budget_exhausted = ctx.inference_budget_exhausted;
474
475 let current_model = ctx.inference_model.as_deref();
477
478 let evaluator = FallbackEvaluator::new(
480 fallback_config,
481 ctx.tried_upstreams(),
482 ctx.fallback_attempt,
483 );
484
485 if let Some(decision) = evaluator.should_fallback_before_request(
487 upstream_name,
488 is_healthy,
489 is_budget_exhausted,
490 current_model,
491 ) {
492 info!(
493 correlation_id = %ctx.trace_id,
494 route_id = %route_match.route_id,
495 from_upstream = %upstream_name,
496 to_upstream = %decision.next_upstream,
497 reason = %decision.reason,
498 fallback_attempt = ctx.fallback_attempt + 1,
499 "Triggering fallback routing"
500 );
501
502 if let Some(metrics) = get_fallback_metrics() {
504 metrics.record_fallback_attempt(
505 route_match.route_id.as_str(),
506 upstream_name,
507 &decision.next_upstream,
508 &decision.reason,
509 );
510 }
511
512 ctx.record_fallback(decision.reason, &decision.next_upstream);
514
515 if let Some((original, mapped)) = decision.model_mapping {
517 if let Some(metrics) = get_fallback_metrics() {
519 metrics.record_model_mapping(
520 route_match.route_id.as_str(),
521 &original,
522 &mapped,
523 );
524 }
525
526 ctx.record_model_mapping(original, mapped);
527 trace!(
528 correlation_id = %ctx.trace_id,
529 original_model = ?ctx.model_mapping_applied().map(|m| &m.0),
530 mapped_model = ?ctx.model_mapping_applied().map(|m| &m.1),
531 "Applied model mapping for fallback"
532 );
533 }
534 }
535 }
536
537 debug!(
538 correlation_id = %ctx.trace_id,
539 route_id = %route_match.route_id,
540 upstream = ?ctx.upstream,
541 method = %req_header.method,
542 path = %req_header.uri.path(),
543 host = ctx.host.as_deref().unwrap_or("-"),
544 client_ip = %ctx.client_ip,
545 "Processing request"
546 );
547
548 if ctx
550 .upstream
551 .as_ref()
552 .is_some_and(|u| u.starts_with("_static_"))
553 {
554 return Err(Error::explain(
556 ErrorType::InternalError,
557 "Static route should be handled in request_filter",
558 ));
559 }
560
561 let upstream_name = ctx
562 .upstream
563 .as_ref()
564 .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
565
566 trace!(
567 correlation_id = %ctx.trace_id,
568 upstream = %upstream_name,
569 "Looking up upstream pool"
570 );
571
572 let pool = self
573 .upstream_pools
574 .get(upstream_name)
575 .await
576 .ok_or_else(|| {
577 error!(
578 correlation_id = %ctx.trace_id,
579 upstream = %upstream_name,
580 "Upstream pool not found"
581 );
582 Error::explain(
583 ErrorType::InternalError,
584 format!("Upstream pool '{}' not found", upstream_name),
585 )
586 })?;
587
588 let max_retries = route_match
590 .config
591 .retry_policy
592 .as_ref()
593 .map(|r| r.max_attempts)
594 .unwrap_or(1);
595
596 trace!(
597 correlation_id = %ctx.trace_id,
598 upstream = %upstream_name,
599 max_retries = max_retries,
600 "Starting upstream peer selection"
601 );
602
603 let mut last_error = None;
604 let selection_start = std::time::Instant::now();
605
606 for attempt in 1..=max_retries {
607 ctx.upstream_attempts = attempt;
608
609 trace!(
610 correlation_id = %ctx.trace_id,
611 upstream = %upstream_name,
612 attempt = attempt,
613 max_retries = max_retries,
614 "Attempting to select upstream peer"
615 );
616
617 match pool.select_peer_with_metadata(None).await {
618 Ok((peer, metadata)) => {
619 let selection_duration = selection_start.elapsed();
620 let peer_addr = peer.address().to_string();
622 ctx.selected_upstream_address = Some(peer_addr.clone());
623
624 if metadata.contains_key("sticky_session_new") {
626 ctx.sticky_session_new_assignment = true;
627 ctx.sticky_session_set_cookie =
628 metadata.get("sticky_set_cookie_header").cloned();
629 ctx.sticky_target_index = metadata
630 .get("sticky_target_index")
631 .and_then(|s| s.parse().ok());
632
633 trace!(
634 correlation_id = %ctx.trace_id,
635 sticky_target_index = ?ctx.sticky_target_index,
636 "New sticky session assignment, will set cookie"
637 );
638 }
639
640 debug!(
641 correlation_id = %ctx.trace_id,
642 upstream = %upstream_name,
643 peer_address = %peer_addr,
644 attempt = attempt,
645 selection_duration_us = selection_duration.as_micros(),
646 sticky_session_hit = metadata.contains_key("sticky_session_hit"),
647 sticky_session_new = ctx.sticky_session_new_assignment,
648 "Selected upstream peer"
649 );
650 return Ok(Box::new(peer));
651 }
652 Err(e) => {
653 warn!(
654 correlation_id = %ctx.trace_id,
655 upstream = %upstream_name,
656 attempt = attempt,
657 max_retries = max_retries,
658 error = %e,
659 "Failed to select upstream peer"
660 );
661 last_error = Some(e);
662
663 if attempt < max_retries {
664 let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
666 trace!(
667 correlation_id = %ctx.trace_id,
668 backoff_ms = backoff.as_millis(),
669 "Backing off before retry"
670 );
671 sleep(backoff).await;
672 }
673 }
674 }
675 }
676
677 let selection_duration = selection_start.elapsed();
678 error!(
679 correlation_id = %ctx.trace_id,
680 upstream = %upstream_name,
681 attempts = max_retries,
682 selection_duration_ms = selection_duration.as_millis(),
683 last_error = ?last_error,
684 "All upstream selection attempts failed"
685 );
686
687 if ctx.used_fallback() {
689 if let Some(metrics) = get_fallback_metrics() {
690 metrics.record_fallback_exhausted(ctx.route_id.as_deref().unwrap_or("unknown"));
691 }
692 }
693
694 Err(Error::explain(
695 ErrorType::InternalError,
696 format!("All upstream attempts failed: {:?}", last_error),
697 ))
698 }
699
700 async fn request_filter(
701 &self,
702 session: &mut Session,
703 ctx: &mut Self::CTX,
704 ) -> Result<bool, Box<Error>> {
705 trace!(
706 correlation_id = %ctx.trace_id,
707 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
708 "Starting request filter phase"
709 );
710
711 if let Some(route_id) = ctx.route_id.as_deref() {
714 if self.rate_limit_manager.has_route_limiter(route_id) {
715 let rate_result = self.rate_limit_manager.check(
716 route_id,
717 &ctx.client_ip,
718 &ctx.path,
719 Option::<&NoHeaderAccessor>::None,
720 );
721
722 if rate_result.limit > 0 {
724 ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
725 limit: rate_result.limit,
726 remaining: rate_result.remaining,
727 reset_at: rate_result.reset_at,
728 });
729 }
730
731 if !rate_result.allowed {
732 use sentinel_config::RateLimitAction;
733
734 match rate_result.action {
735 RateLimitAction::Reject => {
736 warn!(
737 correlation_id = %ctx.trace_id,
738 route_id = route_id,
739 client_ip = %ctx.client_ip,
740 limiter = %rate_result.limiter,
741 limit = rate_result.limit,
742 remaining = rate_result.remaining,
743 "Request rate limited"
744 );
745 self.metrics.record_blocked_request("rate_limited");
746
747 let audit_entry = AuditLogEntry::rate_limited(
749 &ctx.trace_id,
750 &ctx.method,
751 &ctx.path,
752 &ctx.client_ip,
753 &rate_result.limiter,
754 )
755 .with_route_id(route_id)
756 .with_status_code(rate_result.status_code);
757 self.log_manager.log_audit(&audit_entry);
758
759 let body = rate_result
761 .message
762 .unwrap_or_else(|| "Rate limit exceeded".to_string());
763
764 let retry_after = rate_result.reset_at.saturating_sub(
766 std::time::SystemTime::now()
767 .duration_since(std::time::UNIX_EPOCH)
768 .unwrap_or_default()
769 .as_secs(),
770 );
771 crate::http_helpers::write_rate_limit_error(
772 session,
773 rate_result.status_code,
774 &body,
775 rate_result.limit,
776 rate_result.remaining,
777 rate_result.reset_at,
778 retry_after,
779 )
780 .await?;
781 return Ok(true); }
783 RateLimitAction::LogOnly => {
784 debug!(
785 correlation_id = %ctx.trace_id,
786 route_id = route_id,
787 "Rate limit exceeded (log only mode)"
788 );
789 }
791 RateLimitAction::Delay => {
792 if let Some(delay_ms) = rate_result.suggested_delay_ms {
794 let actual_delay = delay_ms.min(rate_result.max_delay_ms);
796
797 if actual_delay > 0 {
798 debug!(
799 correlation_id = %ctx.trace_id,
800 route_id = route_id,
801 suggested_delay_ms = delay_ms,
802 max_delay_ms = rate_result.max_delay_ms,
803 actual_delay_ms = actual_delay,
804 "Applying rate limit delay"
805 );
806
807 tokio::time::sleep(std::time::Duration::from_millis(
808 actual_delay,
809 ))
810 .await;
811 }
812 }
813 }
815 }
816 }
817 }
818 }
819
820 if let Some(route_id) = ctx.route_id.as_deref() {
823 if let Some(ref route_config) = ctx.route_config {
824 if route_config.service_type == sentinel_config::ServiceType::Inference
825 && self.inference_rate_limit_manager.has_route(route_id)
826 {
827 let headers = &session.req_header().headers;
830
831 let body = ctx.body_buffer.as_slice();
833
834 let rate_limit_key = &ctx.client_ip;
836
837 if let Some(check_result) = self.inference_rate_limit_manager.check(
838 route_id,
839 rate_limit_key,
840 headers,
841 body,
842 ) {
843 ctx.inference_rate_limit_enabled = true;
845 ctx.inference_estimated_tokens = check_result.estimated_tokens;
846 ctx.inference_rate_limit_key = Some(rate_limit_key.to_string());
847 ctx.inference_model = check_result.model.clone();
848
849 if !check_result.is_allowed() {
850 let retry_after_ms = check_result.retry_after_ms();
851 let retry_after_secs = retry_after_ms.div_ceil(1000);
852
853 warn!(
854 correlation_id = %ctx.trace_id,
855 route_id = route_id,
856 client_ip = %ctx.client_ip,
857 estimated_tokens = check_result.estimated_tokens,
858 model = ?check_result.model,
859 retry_after_ms = retry_after_ms,
860 "Inference rate limit exceeded (tokens)"
861 );
862 self.metrics.record_blocked_request("inference_rate_limited");
863
864 let audit_entry = AuditLogEntry::new(
866 &ctx.trace_id,
867 AuditEventType::RateLimitExceeded,
868 &ctx.method,
869 &ctx.path,
870 &ctx.client_ip,
871 )
872 .with_route_id(route_id)
873 .with_status_code(429)
874 .with_reason(format!(
875 "Token rate limit exceeded: estimated {} tokens, model={:?}",
876 check_result.estimated_tokens, check_result.model
877 ));
878 self.log_manager.log_audit(&audit_entry);
879
880 let body = "Token rate limit exceeded";
882 let reset_at = std::time::SystemTime::now()
883 .duration_since(std::time::UNIX_EPOCH)
884 .unwrap_or_default()
885 .as_secs()
886 + retry_after_secs;
887
888 crate::http_helpers::write_rate_limit_error(
890 session,
891 429,
892 body,
893 0, 0, reset_at,
896 retry_after_secs,
897 )
898 .await?;
899 return Ok(true); }
901
902 trace!(
903 correlation_id = %ctx.trace_id,
904 route_id = route_id,
905 estimated_tokens = check_result.estimated_tokens,
906 model = ?check_result.model,
907 "Inference rate limit check passed"
908 );
909
910 if self.inference_rate_limit_manager.has_budget(route_id) {
912 ctx.inference_budget_enabled = true;
913
914 if let Some(budget_result) = self.inference_rate_limit_manager.check_budget(
915 route_id,
916 rate_limit_key,
917 check_result.estimated_tokens,
918 ) {
919 if !budget_result.is_allowed() {
920 let retry_after_secs = budget_result.retry_after_secs();
921
922 warn!(
923 correlation_id = %ctx.trace_id,
924 route_id = route_id,
925 client_ip = %ctx.client_ip,
926 estimated_tokens = check_result.estimated_tokens,
927 retry_after_secs = retry_after_secs,
928 "Token budget exhausted"
929 );
930
931 ctx.inference_budget_exhausted = true;
932 self.metrics.record_blocked_request("budget_exhausted");
933
934 let audit_entry = AuditLogEntry::new(
936 &ctx.trace_id,
937 AuditEventType::RateLimitExceeded,
938 &ctx.method,
939 &ctx.path,
940 &ctx.client_ip,
941 )
942 .with_route_id(route_id)
943 .with_status_code(429)
944 .with_reason("Token budget exhausted".to_string());
945 self.log_manager.log_audit(&audit_entry);
946
947 let body = "Token budget exhausted";
949 let reset_at = std::time::SystemTime::now()
950 .duration_since(std::time::UNIX_EPOCH)
951 .unwrap_or_default()
952 .as_secs()
953 + retry_after_secs;
954
955 crate::http_helpers::write_rate_limit_error(
956 session,
957 429,
958 body,
959 0,
960 0,
961 reset_at,
962 retry_after_secs,
963 )
964 .await?;
965 return Ok(true);
966 }
967
968 let remaining = match &budget_result {
970 sentinel_common::budget::BudgetCheckResult::Allowed { remaining } => *remaining as i64,
971 sentinel_common::budget::BudgetCheckResult::Soft { remaining, .. } => *remaining,
972 _ => 0,
973 };
974 ctx.inference_budget_remaining = Some(remaining);
975
976 if let Some(status) = self.inference_rate_limit_manager.budget_status(
978 route_id,
979 rate_limit_key,
980 ) {
981 ctx.inference_budget_period_reset = Some(status.period_end);
982 }
983
984 trace!(
985 correlation_id = %ctx.trace_id,
986 route_id = route_id,
987 budget_remaining = remaining,
988 "Token budget check passed"
989 );
990 }
991 }
992
993 if self.inference_rate_limit_manager.has_cost_attribution(route_id) {
995 ctx.inference_cost_enabled = true;
996 }
997 }
998 }
999 }
1000 }
1001
1002 if let Some(ref route_config) = ctx.route_config {
1004 if let Some(ref inference) = route_config.inference {
1005 if let Some(ref guardrails) = inference.guardrails {
1006 if let Some(ref pi_config) = guardrails.prompt_injection {
1007 if pi_config.enabled && !ctx.body_buffer.is_empty() {
1008 ctx.guardrails_enabled = true;
1009
1010 if let Some(content) = extract_inference_content(&ctx.body_buffer) {
1012 let result = self
1013 .guardrail_processor
1014 .check_prompt_injection(
1015 pi_config,
1016 &content,
1017 ctx.inference_model.as_deref(),
1018 ctx.route_id.as_deref(),
1019 &ctx.trace_id,
1020 )
1021 .await;
1022
1023 match result {
1024 PromptInjectionResult::Blocked {
1025 status,
1026 message,
1027 detections,
1028 } => {
1029 warn!(
1030 correlation_id = %ctx.trace_id,
1031 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1032 detection_count = detections.len(),
1033 "Prompt injection detected, blocking request"
1034 );
1035
1036 self.metrics.record_blocked_request("prompt_injection");
1037
1038 ctx.guardrail_detection_categories = detections
1040 .iter()
1041 .map(|d| d.category.clone())
1042 .collect();
1043
1044 let audit_entry = AuditLogEntry::new(
1046 &ctx.trace_id,
1047 AuditEventType::Blocked,
1048 &ctx.method,
1049 &ctx.path,
1050 &ctx.client_ip,
1051 )
1052 .with_route_id(
1053 ctx.route_id.as_deref().unwrap_or("unknown"),
1054 )
1055 .with_status_code(status)
1056 .with_reason("Prompt injection detected".to_string());
1057 self.log_manager.log_audit(&audit_entry);
1058
1059 crate::http_helpers::write_json_error(
1061 session,
1062 status,
1063 "prompt_injection_blocked",
1064 Some(&message),
1065 )
1066 .await?;
1067 return Ok(true);
1068 }
1069 PromptInjectionResult::Detected { detections } => {
1070 warn!(
1072 correlation_id = %ctx.trace_id,
1073 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1074 detection_count = detections.len(),
1075 "Prompt injection detected (logged only)"
1076 );
1077 ctx.guardrail_detection_categories = detections
1078 .iter()
1079 .map(|d| d.category.clone())
1080 .collect();
1081 }
1082 PromptInjectionResult::Warning { detections } => {
1083 ctx.guardrail_warning = true;
1085 ctx.guardrail_detection_categories = detections
1086 .iter()
1087 .map(|d| d.category.clone())
1088 .collect();
1089 debug!(
1090 correlation_id = %ctx.trace_id,
1091 "Prompt injection warning set"
1092 );
1093 }
1094 PromptInjectionResult::Clean => {
1095 trace!(
1096 correlation_id = %ctx.trace_id,
1097 "No prompt injection detected"
1098 );
1099 }
1100 PromptInjectionResult::Error { message } => {
1101 trace!(
1103 correlation_id = %ctx.trace_id,
1104 error = %message,
1105 "Prompt injection check error (failure mode applied)"
1106 );
1107 }
1108 }
1109 }
1110 }
1111 }
1112 }
1113 }
1114 }
1115
1116 if let Some(route_id) = ctx.route_id.as_deref() {
1118 if let Some(ref route_config) = ctx.route_config {
1119 for filter_id in &route_config.filters {
1120 if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
1121 ctx.geo_country_code = result.country_code.clone();
1123 ctx.geo_lookup_performed = true;
1124
1125 if !result.allowed {
1126 warn!(
1127 correlation_id = %ctx.trace_id,
1128 route_id = route_id,
1129 client_ip = %ctx.client_ip,
1130 country = ?result.country_code,
1131 filter_id = %filter_id,
1132 "Request blocked by geo filter"
1133 );
1134 self.metrics.record_blocked_request("geo_blocked");
1135
1136 let audit_entry = AuditLogEntry::new(
1138 &ctx.trace_id,
1139 AuditEventType::Blocked,
1140 &ctx.method,
1141 &ctx.path,
1142 &ctx.client_ip,
1143 )
1144 .with_route_id(route_id)
1145 .with_status_code(result.status_code)
1146 .with_reason(format!(
1147 "Geo blocked: country={}, filter={}",
1148 result.country_code.as_deref().unwrap_or("unknown"),
1149 filter_id
1150 ));
1151 self.log_manager.log_audit(&audit_entry);
1152
1153 let body = result
1155 .block_message
1156 .unwrap_or_else(|| "Access denied".to_string());
1157
1158 crate::http_helpers::write_error(
1159 session,
1160 result.status_code,
1161 &body,
1162 "text/plain",
1163 )
1164 .await?;
1165 return Ok(true); }
1167
1168 break;
1170 }
1171 }
1172 }
1173 }
1174
1175 let is_websocket_upgrade = session
1177 .req_header()
1178 .headers
1179 .get(http::header::UPGRADE)
1180 .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
1181 .unwrap_or(false);
1182
1183 if is_websocket_upgrade {
1184 ctx.is_websocket_upgrade = true;
1185
1186 if let Some(ref route_config) = ctx.route_config {
1188 if !route_config.websocket {
1189 warn!(
1190 correlation_id = %ctx.trace_id,
1191 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1192 client_ip = %ctx.client_ip,
1193 "WebSocket upgrade rejected: not enabled for route"
1194 );
1195
1196 self.metrics.record_blocked_request("websocket_not_enabled");
1197
1198 let audit_entry = AuditLogEntry::new(
1200 &ctx.trace_id,
1201 AuditEventType::Blocked,
1202 &ctx.method,
1203 &ctx.path,
1204 &ctx.client_ip,
1205 )
1206 .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
1207 .with_action("websocket_rejected")
1208 .with_reason("WebSocket not enabled for route");
1209 self.log_manager.log_audit(&audit_entry);
1210
1211 crate::http_helpers::write_error(
1213 session,
1214 403,
1215 "WebSocket not enabled for this route",
1216 "text/plain",
1217 )
1218 .await?;
1219 return Ok(true); }
1221
1222 debug!(
1223 correlation_id = %ctx.trace_id,
1224 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1225 "WebSocket upgrade request allowed"
1226 );
1227
1228 if route_config.websocket_inspection {
1230 let has_compression = session
1232 .req_header()
1233 .headers
1234 .get("Sec-WebSocket-Extensions")
1235 .and_then(|v| v.to_str().ok())
1236 .map(|s| s.contains("permessage-deflate"))
1237 .unwrap_or(false);
1238
1239 if has_compression {
1240 debug!(
1241 correlation_id = %ctx.trace_id,
1242 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1243 "WebSocket inspection skipped: permessage-deflate negotiated"
1244 );
1245 ctx.websocket_skip_inspection = true;
1246 } else {
1247 ctx.websocket_inspection_enabled = true;
1248
1249 ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
1251 sentinel_agent_protocol::EventType::WebSocketFrame,
1252 );
1253
1254 debug!(
1255 correlation_id = %ctx.trace_id,
1256 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1257 agent_count = ctx.websocket_inspection_agents.len(),
1258 "WebSocket frame inspection enabled"
1259 );
1260 }
1261 }
1262 }
1263 }
1264
1265 if let Some(route_config) = ctx.route_config.clone() {
1268 if route_config.service_type == sentinel_config::ServiceType::Static {
1269 trace!(
1270 correlation_id = %ctx.trace_id,
1271 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1272 "Handling static file route"
1273 );
1274 let route_match = crate::routing::RouteMatch {
1276 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1277 config: route_config.clone(),
1278 };
1279 return self.handle_static_route(session, ctx, &route_match).await;
1280 } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
1281 trace!(
1282 correlation_id = %ctx.trace_id,
1283 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1284 builtin_handler = ?route_config.builtin_handler,
1285 "Handling builtin route"
1286 );
1287 let route_match = crate::routing::RouteMatch {
1289 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1290 config: route_config.clone(),
1291 };
1292 return self.handle_builtin_route(session, ctx, &route_match).await;
1293 }
1294 }
1295
1296 if let Some(route_id) = ctx.route_id.clone() {
1298 if let Some(validator) = self.validators.get(&route_id).await {
1299 trace!(
1300 correlation_id = %ctx.trace_id,
1301 route_id = %route_id,
1302 "Running API schema validation"
1303 );
1304 if let Some(result) = self
1305 .validate_api_request(session, ctx, &route_id, &validator)
1306 .await?
1307 {
1308 debug!(
1309 correlation_id = %ctx.trace_id,
1310 route_id = %route_id,
1311 validation_passed = result,
1312 "API validation complete"
1313 );
1314 return Ok(result);
1315 }
1316 }
1317 }
1318
1319 let client_addr = session
1321 .client_addr()
1322 .map(|a| format!("{}", a))
1323 .unwrap_or_else(|| "unknown".to_string());
1324 let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
1325
1326 let req_header = session.req_header_mut();
1327
1328 req_header
1330 .insert_header("X-Correlation-Id", &ctx.trace_id)
1331 .ok();
1332 req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
1333
1334 let config = ctx
1336 .config
1337 .get_or_insert_with(|| self.config_manager.current());
1338
1339 const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; let header_count = req_header.headers.len();
1344 if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
1345 && header_count > config.limits.max_header_count
1346 {
1347 warn!(
1348 correlation_id = %ctx.trace_id,
1349 header_count = header_count,
1350 limit = config.limits.max_header_count,
1351 "Request blocked: exceeds header count limit"
1352 );
1353
1354 self.metrics.record_blocked_request("header_count_exceeded");
1355 return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
1356 }
1357
1358 if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
1360 let total_header_size: usize = req_header
1361 .headers
1362 .iter()
1363 .map(|(k, v)| k.as_str().len() + v.len())
1364 .sum();
1365
1366 if total_header_size > config.limits.max_header_size_bytes {
1367 warn!(
1368 correlation_id = %ctx.trace_id,
1369 header_size = total_header_size,
1370 limit = config.limits.max_header_size_bytes,
1371 "Request blocked: exceeds header size limit"
1372 );
1373
1374 self.metrics.record_blocked_request("header_size_exceeded");
1375 return Err(Error::explain(
1376 ErrorType::InternalError,
1377 "Headers too large",
1378 ));
1379 }
1380 }
1381
1382 trace!(
1384 correlation_id = %ctx.trace_id,
1385 "Processing request through agents"
1386 );
1387 if let Err(e) = self
1388 .process_agents(session, ctx, &client_addr, client_port)
1389 .await
1390 {
1391 if let ErrorType::HTTPStatus(status) = e.etype() {
1394 let error_msg = e.to_string();
1396 let body = error_msg
1397 .split("context:")
1398 .nth(1)
1399 .map(|s| s.trim())
1400 .unwrap_or("Request blocked");
1401 debug!(
1402 correlation_id = %ctx.trace_id,
1403 status = status,
1404 body = %body,
1405 "Sending HTTP error response for agent block"
1406 );
1407 crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
1408 return Ok(true); }
1410 return Err(e);
1412 }
1413
1414 trace!(
1415 correlation_id = %ctx.trace_id,
1416 "Request filter phase complete, forwarding to upstream"
1417 );
1418
1419 Ok(false) }
1421
1422 async fn request_body_filter(
1429 &self,
1430 _session: &mut Session,
1431 body: &mut Option<Bytes>,
1432 end_of_stream: bool,
1433 ctx: &mut Self::CTX,
1434 ) -> Result<(), Box<Error>> {
1435 use sentinel_config::BodyStreamingMode;
1436
1437 if ctx.is_websocket_upgrade {
1439 if let Some(ref handler) = ctx.websocket_handler {
1440 let result = handler.process_client_data(body.take()).await;
1441 match result {
1442 crate::websocket::ProcessResult::Forward(data) => {
1443 *body = data;
1444 }
1445 crate::websocket::ProcessResult::Close(reason) => {
1446 warn!(
1447 correlation_id = %ctx.trace_id,
1448 code = reason.code,
1449 reason = %reason.reason,
1450 "WebSocket connection closed by agent (client->server)"
1451 );
1452 return Err(Error::explain(
1454 ErrorType::InternalError,
1455 format!("WebSocket closed: {} {}", reason.code, reason.reason),
1456 ));
1457 }
1458 }
1459 }
1460 return Ok(());
1462 }
1463
1464 let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
1466 if chunk_len > 0 {
1467 ctx.request_body_bytes += chunk_len as u64;
1468
1469 trace!(
1470 correlation_id = %ctx.trace_id,
1471 chunk_size = chunk_len,
1472 total_body_bytes = ctx.request_body_bytes,
1473 end_of_stream = end_of_stream,
1474 streaming_mode = ?ctx.request_body_streaming_mode,
1475 "Processing request body chunk"
1476 );
1477
1478 let config = ctx
1480 .config
1481 .get_or_insert_with(|| self.config_manager.current());
1482 if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
1483 warn!(
1484 correlation_id = %ctx.trace_id,
1485 body_bytes = ctx.request_body_bytes,
1486 limit = config.limits.max_body_size_bytes,
1487 "Request body size limit exceeded"
1488 );
1489 self.metrics.record_blocked_request("body_size_exceeded");
1490 return Err(Error::explain(
1491 ErrorType::InternalError,
1492 "Request body too large",
1493 ));
1494 }
1495 }
1496
1497 if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
1499 let config = ctx
1500 .config
1501 .get_or_insert_with(|| self.config_manager.current());
1502 let max_inspection_bytes = config
1503 .waf
1504 .as_ref()
1505 .map(|w| w.body_inspection.max_inspection_bytes as u64)
1506 .unwrap_or(1024 * 1024);
1507
1508 match ctx.request_body_streaming_mode {
1509 BodyStreamingMode::Stream => {
1510 if body.is_some() {
1512 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1513 .await?;
1514 } else if end_of_stream && ctx.agent_needs_more {
1515 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1517 .await?;
1518 }
1519 }
1520 BodyStreamingMode::Hybrid { buffer_threshold } => {
1521 if ctx.body_bytes_inspected < buffer_threshold as u64 {
1523 if let Some(ref chunk) = body {
1525 let bytes_to_buffer = std::cmp::min(
1526 chunk.len(),
1527 (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
1528 );
1529 ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
1530 ctx.body_bytes_inspected += bytes_to_buffer as u64;
1531
1532 if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
1534 {
1535 self.send_buffered_body_to_agents(
1537 end_of_stream && chunk.len() == bytes_to_buffer,
1538 ctx,
1539 )
1540 .await?;
1541 ctx.body_buffer.clear();
1542
1543 if bytes_to_buffer < chunk.len() {
1545 let remaining = chunk.slice(bytes_to_buffer..);
1546 let mut remaining_body = Some(remaining);
1547 self.process_body_chunk_streaming(
1548 &mut remaining_body,
1549 end_of_stream,
1550 ctx,
1551 )
1552 .await?;
1553 }
1554 }
1555 }
1556 } else {
1557 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1559 .await?;
1560 }
1561 }
1562 BodyStreamingMode::Buffer => {
1563 if let Some(ref chunk) = body {
1565 if ctx.body_bytes_inspected < max_inspection_bytes {
1566 let bytes_to_inspect = std::cmp::min(
1567 chunk.len() as u64,
1568 max_inspection_bytes - ctx.body_bytes_inspected,
1569 ) as usize;
1570
1571 ctx.body_buffer
1572 .extend_from_slice(&chunk[..bytes_to_inspect]);
1573 ctx.body_bytes_inspected += bytes_to_inspect as u64;
1574
1575 trace!(
1576 correlation_id = %ctx.trace_id,
1577 bytes_inspected = ctx.body_bytes_inspected,
1578 max_inspection_bytes = max_inspection_bytes,
1579 buffer_size = ctx.body_buffer.len(),
1580 "Buffering body for agent inspection"
1581 );
1582 }
1583 }
1584
1585 let should_send =
1587 end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1588 if should_send && !ctx.body_buffer.is_empty() {
1589 self.send_buffered_body_to_agents(end_of_stream, ctx)
1590 .await?;
1591 ctx.body_buffer.clear();
1592 }
1593 }
1594 }
1595 }
1596
1597 if end_of_stream {
1598 trace!(
1599 correlation_id = %ctx.trace_id,
1600 total_body_bytes = ctx.request_body_bytes,
1601 bytes_inspected = ctx.body_bytes_inspected,
1602 "Request body complete"
1603 );
1604 }
1605
1606 Ok(())
1607 }
1608
1609 async fn response_filter(
1610 &self,
1611 _session: &mut Session,
1612 upstream_response: &mut ResponseHeader,
1613 ctx: &mut Self::CTX,
1614 ) -> Result<(), Box<Error>> {
1615 let status = upstream_response.status.as_u16();
1616 let duration = ctx.elapsed();
1617
1618 trace!(
1619 correlation_id = %ctx.trace_id,
1620 status = status,
1621 "Starting response filter phase"
1622 );
1623
1624 if status == 101 && ctx.is_websocket_upgrade {
1626 if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1627 let inspector = crate::websocket::WebSocketInspector::with_metrics(
1629 self.agent_manager.clone(),
1630 ctx.route_id
1631 .clone()
1632 .unwrap_or_else(|| "unknown".to_string()),
1633 ctx.trace_id.clone(),
1634 ctx.client_ip.clone(),
1635 100, Some(self.metrics.clone()),
1637 );
1638
1639 let handler = crate::websocket::WebSocketHandler::new(
1640 std::sync::Arc::new(inspector),
1641 1024 * 1024, );
1643
1644 ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1645
1646 info!(
1647 correlation_id = %ctx.trace_id,
1648 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1649 agent_count = ctx.websocket_inspection_agents.len(),
1650 "WebSocket upgrade successful, frame inspection enabled"
1651 );
1652 } else if ctx.websocket_skip_inspection {
1653 debug!(
1654 correlation_id = %ctx.trace_id,
1655 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1656 "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1657 );
1658 } else {
1659 debug!(
1660 correlation_id = %ctx.trace_id,
1661 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1662 "WebSocket upgrade successful"
1663 );
1664 }
1665 }
1666
1667 trace!(
1669 correlation_id = %ctx.trace_id,
1670 "Applying security headers"
1671 );
1672 self.apply_security_headers(upstream_response).ok();
1673
1674 upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1676
1677 if let Some(ref rate_info) = ctx.rate_limit_info {
1679 upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1680 upstream_response
1681 .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1682 upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1683 }
1684
1685 if ctx.inference_budget_enabled {
1687 if let Some(remaining) = ctx.inference_budget_remaining {
1688 upstream_response.insert_header("X-Budget-Remaining", remaining.to_string())?;
1689 }
1690 if let Some(period_reset) = ctx.inference_budget_period_reset {
1691 let reset_datetime = chrono::DateTime::from_timestamp(period_reset as i64, 0)
1693 .map(|dt| dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1694 .unwrap_or_else(|| period_reset.to_string());
1695 upstream_response.insert_header("X-Budget-Period-Reset", reset_datetime)?;
1696 }
1697 }
1698
1699 if let Some(ref country_code) = ctx.geo_country_code {
1701 upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1702 }
1703
1704 if ctx.sticky_session_new_assignment {
1706 if let Some(ref set_cookie_header) = ctx.sticky_session_set_cookie {
1707 upstream_response.insert_header("Set-Cookie", set_cookie_header)?;
1708 trace!(
1709 correlation_id = %ctx.trace_id,
1710 sticky_target_index = ?ctx.sticky_target_index,
1711 "Added sticky session Set-Cookie header"
1712 );
1713 }
1714 }
1715
1716 if ctx.guardrail_warning {
1718 upstream_response.insert_header("X-Guardrail-Warning", "prompt-injection-detected")?;
1719 }
1720
1721 if ctx.used_fallback() {
1723 upstream_response.insert_header("X-Fallback-Used", "true")?;
1724
1725 if let Some(ref upstream) = ctx.upstream {
1726 upstream_response.insert_header("X-Fallback-Upstream", upstream)?;
1727 }
1728
1729 if let Some(ref reason) = ctx.fallback_reason {
1730 upstream_response.insert_header("X-Fallback-Reason", reason.to_string())?;
1731 }
1732
1733 if let Some(ref original) = ctx.original_upstream {
1734 upstream_response.insert_header("X-Original-Upstream", original)?;
1735 }
1736
1737 if let Some(ref mapping) = ctx.model_mapping_applied {
1738 upstream_response.insert_header(
1739 "X-Model-Mapping",
1740 format!("{} -> {}", mapping.0, mapping.1),
1741 )?;
1742 }
1743
1744 trace!(
1745 correlation_id = %ctx.trace_id,
1746 fallback_attempt = ctx.fallback_attempt,
1747 fallback_upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1748 original_upstream = ctx.original_upstream.as_deref().unwrap_or("unknown"),
1749 "Added fallback response headers"
1750 );
1751
1752 if status < 400 {
1754 if let Some(metrics) = get_fallback_metrics() {
1755 metrics.record_fallback_success(
1756 ctx.route_id.as_deref().unwrap_or("unknown"),
1757 ctx.upstream.as_deref().unwrap_or("unknown"),
1758 );
1759 }
1760 }
1761 }
1762
1763 if ctx.inference_rate_limit_enabled {
1765 let content_type = upstream_response
1767 .headers
1768 .get("content-type")
1769 .and_then(|ct| ct.to_str().ok());
1770
1771 if is_sse_response(content_type) {
1772 let provider = ctx
1774 .route_config
1775 .as_ref()
1776 .and_then(|r| r.inference.as_ref())
1777 .map(|i| i.provider.clone())
1778 .unwrap_or_default();
1779
1780 ctx.inference_streaming_response = true;
1781 ctx.inference_streaming_counter = Some(StreamingTokenCounter::new(
1782 provider,
1783 ctx.inference_model.clone(),
1784 ));
1785
1786 trace!(
1787 correlation_id = %ctx.trace_id,
1788 content_type = ?content_type,
1789 model = ?ctx.inference_model,
1790 "Initialized streaming token counter for SSE response"
1791 );
1792 }
1793 }
1794
1795 if status >= 400 {
1797 trace!(
1798 correlation_id = %ctx.trace_id,
1799 status = status,
1800 "Handling error response"
1801 );
1802 self.handle_error_response(upstream_response, ctx).await?;
1803 }
1804
1805 self.metrics.record_request(
1807 ctx.route_id.as_deref().unwrap_or("unknown"),
1808 &ctx.method,
1809 status,
1810 duration,
1811 );
1812
1813 if let Some(ref mut span) = ctx.otel_span {
1815 span.set_status(status);
1816 if let Some(ref upstream) = ctx.upstream {
1817 span.set_upstream(upstream, "");
1818 }
1819 if status >= 500 {
1820 span.record_error(&format!("HTTP {}", status));
1821 }
1822 }
1823
1824 if let Some(ref upstream) = ctx.upstream {
1826 let success = status < 500;
1827
1828 trace!(
1829 correlation_id = %ctx.trace_id,
1830 upstream = %upstream,
1831 success = success,
1832 status = status,
1833 "Recording passive health check result"
1834 );
1835
1836 let error_msg = if !success {
1837 Some(format!("HTTP {}", status))
1838 } else {
1839 None
1840 };
1841 self.passive_health
1842 .record_outcome(upstream, success, error_msg.as_deref())
1843 .await;
1844
1845 if let Some(pool) = self.upstream_pools.get(upstream).await {
1847 pool.report_result(upstream, success).await;
1848 }
1849
1850 if !success {
1851 warn!(
1852 correlation_id = %ctx.trace_id,
1853 upstream = %upstream,
1854 status = status,
1855 "Upstream returned error status"
1856 );
1857 }
1858 }
1859
1860 if status >= 500 {
1862 error!(
1863 correlation_id = %ctx.trace_id,
1864 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1865 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1866 method = %ctx.method,
1867 path = %ctx.path,
1868 status = status,
1869 duration_ms = duration.as_millis(),
1870 attempts = ctx.upstream_attempts,
1871 "Request completed with server error"
1872 );
1873 } else if status >= 400 {
1874 warn!(
1875 correlation_id = %ctx.trace_id,
1876 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1877 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1878 method = %ctx.method,
1879 path = %ctx.path,
1880 status = status,
1881 duration_ms = duration.as_millis(),
1882 "Request completed with client error"
1883 );
1884 } else {
1885 debug!(
1886 correlation_id = %ctx.trace_id,
1887 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1888 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1889 method = %ctx.method,
1890 path = %ctx.path,
1891 status = status,
1892 duration_ms = duration.as_millis(),
1893 attempts = ctx.upstream_attempts,
1894 "Request completed"
1895 );
1896 }
1897
1898 Ok(())
1899 }
1900
1901 async fn upstream_request_filter(
1904 &self,
1905 _session: &mut Session,
1906 upstream_request: &mut pingora::http::RequestHeader,
1907 ctx: &mut Self::CTX,
1908 ) -> Result<()>
1909 where
1910 Self::CTX: Send + Sync,
1911 {
1912 trace!(
1913 correlation_id = %ctx.trace_id,
1914 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1915 "Applying upstream request modifications"
1916 );
1917
1918 upstream_request
1920 .insert_header("X-Trace-Id", &ctx.trace_id)
1921 .ok();
1922
1923 if let Some(ref span) = ctx.otel_span {
1925 let sampled = ctx.trace_context.as_ref().map(|c| c.sampled).unwrap_or(true);
1926 let traceparent =
1927 crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled);
1928 upstream_request
1929 .insert_header(crate::otel::TRACEPARENT_HEADER, &traceparent)
1930 .ok();
1931 }
1932
1933 upstream_request
1935 .insert_header("X-Forwarded-By", "Sentinel")
1936 .ok();
1937
1938 if let Some(ref route_config) = ctx.route_config {
1942 let mods = &route_config.policies.request_headers;
1943
1944 for (name, value) in &mods.set {
1946 upstream_request.insert_header(name.clone(), value.as_str()).ok();
1947 }
1948
1949 for (name, value) in &mods.add {
1951 upstream_request.append_header(name.clone(), value.as_str()).ok();
1952 }
1953
1954 for name in &mods.remove {
1956 upstream_request.remove_header(name);
1957 }
1958
1959 trace!(
1960 correlation_id = %ctx.trace_id,
1961 "Applied request header modifications"
1962 );
1963 }
1964
1965 upstream_request.remove_header("X-Internal-Token");
1967 upstream_request.remove_header("Authorization-Internal");
1968
1969 if let Some(ref route_config) = ctx.route_config {
1972 if let Some(ref shadow_config) = route_config.shadow {
1973 let pools_snapshot = self.upstream_pools.snapshot().await;
1975 let upstream_pools = std::sync::Arc::new(pools_snapshot);
1976
1977 let route_id = ctx.route_id.clone().unwrap_or_else(|| "unknown".to_string());
1979
1980 let shadow_manager = crate::shadow::ShadowManager::new(
1982 upstream_pools,
1983 shadow_config.clone(),
1984 Some(std::sync::Arc::clone(&self.metrics)),
1985 route_id,
1986 );
1987
1988 if shadow_manager.should_shadow(upstream_request) {
1990 trace!(
1991 correlation_id = %ctx.trace_id,
1992 shadow_upstream = %shadow_config.upstream,
1993 percentage = shadow_config.percentage,
1994 "Shadowing request"
1995 );
1996
1997 let shadow_headers = upstream_request.clone();
1999
2000 let shadow_ctx = crate::upstream::RequestContext {
2002 client_ip: ctx.client_ip.parse().ok(),
2003 headers: std::collections::HashMap::new(), path: ctx.path.clone(),
2005 method: ctx.method.clone(),
2006 };
2007
2008 let buffer_body = shadow_config.buffer_body
2010 && crate::shadow::should_buffer_method(&ctx.method);
2011
2012 if buffer_body {
2013 trace!(
2017 correlation_id = %ctx.trace_id,
2018 "Deferring shadow request until body is buffered"
2019 );
2020 ctx.shadow_pending = Some(crate::proxy::context::ShadowPendingRequest {
2021 headers: shadow_headers,
2022 manager: std::sync::Arc::new(shadow_manager),
2023 request_ctx: shadow_ctx,
2024 include_body: true,
2025 });
2026 if !ctx.body_inspection_enabled {
2029 ctx.body_inspection_enabled = true;
2030 }
2033 } else {
2034 shadow_manager.shadow_request(shadow_headers, None, shadow_ctx);
2036 ctx.shadow_sent = true;
2037 }
2038 }
2039 }
2040 }
2041
2042 Ok(())
2043 }
2044
2045 fn response_body_filter(
2051 &self,
2052 _session: &mut Session,
2053 body: &mut Option<Bytes>,
2054 end_of_stream: bool,
2055 ctx: &mut Self::CTX,
2056 ) -> Result<Option<Duration>, Box<Error>> {
2057 if ctx.is_websocket_upgrade {
2060 if let Some(ref handler) = ctx.websocket_handler {
2061 let handler = handler.clone();
2062 let data = body.take();
2063
2064 let result = tokio::task::block_in_place(|| {
2067 tokio::runtime::Handle::current()
2068 .block_on(async { handler.process_server_data(data).await })
2069 });
2070
2071 match result {
2072 crate::websocket::ProcessResult::Forward(data) => {
2073 *body = data;
2074 }
2075 crate::websocket::ProcessResult::Close(reason) => {
2076 warn!(
2077 correlation_id = %ctx.trace_id,
2078 code = reason.code,
2079 reason = %reason.reason,
2080 "WebSocket connection closed by agent (server->client)"
2081 );
2082 let close_frame =
2085 crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
2086 let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
2087 if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
2088 *body = Some(Bytes::from(encoded));
2089 }
2090 }
2091 }
2092 }
2093 return Ok(None);
2095 }
2096
2097 if let Some(ref chunk) = body {
2099 ctx.response_bytes += chunk.len() as u64;
2100
2101 trace!(
2102 correlation_id = %ctx.trace_id,
2103 chunk_size = chunk.len(),
2104 total_response_bytes = ctx.response_bytes,
2105 end_of_stream = end_of_stream,
2106 "Processing response body chunk"
2107 );
2108
2109 if let Some(ref mut counter) = ctx.inference_streaming_counter {
2111 let result = counter.process_chunk(chunk);
2112
2113 if result.content.is_some() || result.is_done {
2114 trace!(
2115 correlation_id = %ctx.trace_id,
2116 has_content = result.content.is_some(),
2117 is_done = result.is_done,
2118 chunks_processed = counter.chunks_processed(),
2119 accumulated_content_len = counter.content().len(),
2120 "Processed SSE chunk for token counting"
2121 );
2122 }
2123 }
2124
2125 if ctx.response_body_inspection_enabled
2129 && !ctx.response_body_inspection_agents.is_empty()
2130 {
2131 let config = ctx
2132 .config
2133 .get_or_insert_with(|| self.config_manager.current());
2134 let max_inspection_bytes = config
2135 .waf
2136 .as_ref()
2137 .map(|w| w.body_inspection.max_inspection_bytes as u64)
2138 .unwrap_or(1024 * 1024);
2139
2140 if ctx.response_body_bytes_inspected < max_inspection_bytes {
2141 let bytes_to_inspect = std::cmp::min(
2142 chunk.len() as u64,
2143 max_inspection_bytes - ctx.response_body_bytes_inspected,
2144 ) as usize;
2145
2146 ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
2150 ctx.response_body_chunk_index += 1;
2151
2152 trace!(
2153 correlation_id = %ctx.trace_id,
2154 bytes_inspected = ctx.response_body_bytes_inspected,
2155 max_inspection_bytes = max_inspection_bytes,
2156 chunk_index = ctx.response_body_chunk_index,
2157 "Tracking response body for inspection"
2158 );
2159 }
2160 }
2161 }
2162
2163 if end_of_stream {
2164 trace!(
2165 correlation_id = %ctx.trace_id,
2166 total_response_bytes = ctx.response_bytes,
2167 response_bytes_inspected = ctx.response_body_bytes_inspected,
2168 "Response body complete"
2169 );
2170 }
2171
2172 Ok(None)
2174 }
2175
2176 async fn connected_to_upstream(
2179 &self,
2180 _session: &mut Session,
2181 reused: bool,
2182 peer: &HttpPeer,
2183 #[cfg(unix)] _fd: RawFd,
2184 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
2185 digest: Option<&Digest>,
2186 ctx: &mut Self::CTX,
2187 ) -> Result<(), Box<Error>> {
2188 ctx.connection_reused = reused;
2190
2191 if reused {
2193 trace!(
2194 correlation_id = %ctx.trace_id,
2195 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2196 peer_address = %peer.address(),
2197 "Reusing existing upstream connection"
2198 );
2199 } else {
2200 debug!(
2201 correlation_id = %ctx.trace_id,
2202 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2203 peer_address = %peer.address(),
2204 ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
2205 "Established new upstream connection"
2206 );
2207 }
2208
2209 Ok(())
2210 }
2211
2212 fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
2222 let route_id = match ctx.route_id.as_deref() {
2224 Some(id) => id,
2225 None => {
2226 trace!(
2227 correlation_id = %ctx.trace_id,
2228 "Cache filter: no route ID, skipping cache"
2229 );
2230 return Ok(());
2231 }
2232 };
2233
2234 if !self.cache_manager.is_enabled(route_id) {
2236 trace!(
2237 correlation_id = %ctx.trace_id,
2238 route_id = %route_id,
2239 "Cache disabled for route"
2240 );
2241 return Ok(());
2242 }
2243
2244 if !self
2246 .cache_manager
2247 .is_method_cacheable(route_id, &ctx.method)
2248 {
2249 trace!(
2250 correlation_id = %ctx.trace_id,
2251 route_id = %route_id,
2252 method = %ctx.method,
2253 "Method not cacheable"
2254 );
2255 return Ok(());
2256 }
2257
2258 debug!(
2260 correlation_id = %ctx.trace_id,
2261 route_id = %route_id,
2262 method = %ctx.method,
2263 path = %ctx.path,
2264 "Enabling HTTP caching for request"
2265 );
2266
2267 let storage = get_cache_storage();
2269 let eviction = get_cache_eviction();
2270 let cache_lock = get_cache_lock();
2271
2272 session.cache.enable(
2274 storage,
2275 Some(eviction),
2276 None, Some(cache_lock),
2278 None, );
2280
2281 ctx.cache_eligible = true;
2283
2284 trace!(
2285 correlation_id = %ctx.trace_id,
2286 route_id = %route_id,
2287 cache_enabled = session.cache.enabled(),
2288 "Cache enabled for request"
2289 );
2290
2291 Ok(())
2292 }
2293
2294 fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
2299 let req_header = session.req_header();
2300 let method = req_header.method.as_str();
2301 let path = req_header.uri.path();
2302 let host = ctx.host.as_deref().unwrap_or("unknown");
2303 let query = req_header.uri.query();
2304
2305 let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2307
2308 trace!(
2309 correlation_id = %ctx.trace_id,
2310 cache_key = %key_string,
2311 "Generated cache key"
2312 );
2313
2314 Ok(CacheKey::default(req_header))
2317 }
2318
2319 fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
2324 session.cache.cache_miss();
2326
2327 if let Some(route_id) = ctx.route_id.as_deref() {
2329 self.cache_manager.stats().record_miss();
2330
2331 trace!(
2332 correlation_id = %ctx.trace_id,
2333 route_id = %route_id,
2334 path = %ctx.path,
2335 "Cache miss"
2336 );
2337 }
2338 }
2339
2340 async fn cache_hit_filter(
2346 &self,
2347 session: &mut Session,
2348 meta: &CacheMeta,
2349 _hit_handler: &mut HitHandler,
2350 is_fresh: bool,
2351 ctx: &mut Self::CTX,
2352 ) -> Result<Option<ForcedFreshness>>
2353 where
2354 Self::CTX: Send + Sync,
2355 {
2356 let req_header = session.req_header();
2358 let method = req_header.method.as_str();
2359 let path = req_header.uri.path();
2360 let host = req_header.uri.host().unwrap_or("localhost");
2361 let query = req_header.uri.query();
2362
2363 let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2365
2366 if self.cache_manager.should_invalidate(&cache_key) {
2368 info!(
2369 correlation_id = %ctx.trace_id,
2370 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2371 cache_key = %cache_key,
2372 "Cache entry invalidated by purge request"
2373 );
2374 return Ok(Some(ForcedFreshness::ForceExpired));
2376 }
2377
2378 if is_fresh {
2380 self.cache_manager.stats().record_hit();
2381
2382 debug!(
2383 correlation_id = %ctx.trace_id,
2384 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2385 is_fresh = is_fresh,
2386 "Cache hit (fresh)"
2387 );
2388 } else {
2389 trace!(
2390 correlation_id = %ctx.trace_id,
2391 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2392 is_fresh = is_fresh,
2393 "Cache hit (stale)"
2394 );
2395 }
2396
2397 Ok(None)
2399 }
2400
2401 fn response_cache_filter(
2406 &self,
2407 _session: &Session,
2408 resp: &ResponseHeader,
2409 ctx: &mut Self::CTX,
2410 ) -> Result<RespCacheable> {
2411 let route_id = match ctx.route_id.as_deref() {
2412 Some(id) => id,
2413 None => {
2414 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2415 "no_route",
2416 )));
2417 }
2418 };
2419
2420 if !self.cache_manager.is_enabled(route_id) {
2422 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2423 "disabled",
2424 )));
2425 }
2426
2427 let status = resp.status.as_u16();
2428
2429 if !self.cache_manager.is_status_cacheable(route_id, status) {
2431 trace!(
2432 correlation_id = %ctx.trace_id,
2433 route_id = %route_id,
2434 status = status,
2435 "Status code not cacheable"
2436 );
2437 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2438 }
2439
2440 if let Some(cache_control) = resp.headers.get("cache-control") {
2442 if let Ok(cc_str) = cache_control.to_str() {
2443 if crate::cache::CacheManager::is_no_cache(cc_str) {
2444 trace!(
2445 correlation_id = %ctx.trace_id,
2446 route_id = %route_id,
2447 cache_control = %cc_str,
2448 "Response has no-cache directive"
2449 );
2450 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2451 }
2452 }
2453 }
2454
2455 let cache_control = resp
2457 .headers
2458 .get("cache-control")
2459 .and_then(|v| v.to_str().ok());
2460 let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
2461
2462 if ttl.is_zero() {
2463 trace!(
2464 correlation_id = %ctx.trace_id,
2465 route_id = %route_id,
2466 "TTL is zero, not caching"
2467 );
2468 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2469 }
2470
2471 let config = self
2473 .cache_manager
2474 .get_route_config(route_id)
2475 .unwrap_or_default();
2476
2477 let now = std::time::SystemTime::now();
2479 let fresh_until = now + ttl;
2480
2481 let header = resp.clone();
2483
2484 let cache_meta = CacheMeta::new(
2486 fresh_until,
2487 now,
2488 config.stale_while_revalidate_secs as u32,
2489 config.stale_if_error_secs as u32,
2490 header,
2491 );
2492
2493 self.cache_manager.stats().record_store();
2495
2496 debug!(
2497 correlation_id = %ctx.trace_id,
2498 route_id = %route_id,
2499 status = status,
2500 ttl_secs = ttl.as_secs(),
2501 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2502 stale_if_error_secs = config.stale_if_error_secs,
2503 "Caching response"
2504 );
2505
2506 Ok(RespCacheable::Cacheable(cache_meta))
2507 }
2508
2509 fn should_serve_stale(
2513 &self,
2514 _session: &mut Session,
2515 ctx: &mut Self::CTX,
2516 error: Option<&Error>,
2517 ) -> bool {
2518 let route_id = match ctx.route_id.as_deref() {
2519 Some(id) => id,
2520 None => return false,
2521 };
2522
2523 let config = match self.cache_manager.get_route_config(route_id) {
2525 Some(c) => c,
2526 None => return false,
2527 };
2528
2529 if let Some(e) = error {
2531 if e.esource() == &pingora::ErrorSource::Upstream {
2533 debug!(
2534 correlation_id = %ctx.trace_id,
2535 route_id = %route_id,
2536 error = %e,
2537 stale_if_error_secs = config.stale_if_error_secs,
2538 "Considering stale-if-error"
2539 );
2540 return config.stale_if_error_secs > 0;
2541 }
2542 }
2543
2544 if error.is_none() && config.stale_while_revalidate_secs > 0 {
2546 trace!(
2547 correlation_id = %ctx.trace_id,
2548 route_id = %route_id,
2549 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2550 "Allowing stale-while-revalidate"
2551 );
2552 return true;
2553 }
2554
2555 false
2556 }
2557
2558 fn range_header_filter(
2568 &self,
2569 session: &mut Session,
2570 response: &mut ResponseHeader,
2571 ctx: &mut Self::CTX,
2572 ) -> pingora_proxy::RangeType
2573 where
2574 Self::CTX: Send + Sync,
2575 {
2576 let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
2578 matches!(
2580 config.service_type,
2581 sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
2582 )
2583 });
2584
2585 if !supports_range {
2586 trace!(
2587 correlation_id = %ctx.trace_id,
2588 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2589 "Range request not supported for this route type"
2590 );
2591 return pingora_proxy::RangeType::None;
2592 }
2593
2594 let range_type = pingora_proxy::range_header_filter(session.req_header(), response, None);
2596
2597 match &range_type {
2598 pingora_proxy::RangeType::None => {
2599 trace!(
2600 correlation_id = %ctx.trace_id,
2601 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2602 "No range request or not applicable"
2603 );
2604 }
2605 pingora_proxy::RangeType::Single(range) => {
2606 trace!(
2607 correlation_id = %ctx.trace_id,
2608 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2609 range_start = range.start,
2610 range_end = range.end,
2611 "Processing single-range request"
2612 );
2613 }
2614 pingora_proxy::RangeType::Multi(multi) => {
2615 trace!(
2616 correlation_id = %ctx.trace_id,
2617 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2618 range_count = multi.ranges.len(),
2619 "Processing multi-range request"
2620 );
2621 }
2622 pingora_proxy::RangeType::Invalid => {
2623 debug!(
2624 correlation_id = %ctx.trace_id,
2625 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2626 "Invalid range header"
2627 );
2628 }
2629 }
2630
2631 range_type
2632 }
2633
2634 async fn fail_to_proxy(
2637 &self,
2638 session: &mut Session,
2639 e: &Error,
2640 ctx: &mut Self::CTX,
2641 ) -> pingora_proxy::FailToProxy
2642 where
2643 Self::CTX: Send + Sync,
2644 {
2645 let error_code = match e.etype() {
2646 ErrorType::ConnectRefused => 503,
2648 ErrorType::ConnectTimedout => 504,
2649 ErrorType::ConnectNoRoute => 502,
2650
2651 ErrorType::ReadTimedout => 504,
2653 ErrorType::WriteTimedout => 504,
2654
2655 ErrorType::TLSHandshakeFailure => 502,
2657 ErrorType::InvalidCert => 502,
2658
2659 ErrorType::InvalidHTTPHeader => 400,
2661 ErrorType::H2Error => 502,
2662
2663 ErrorType::ConnectProxyFailure => 502,
2665 ErrorType::ConnectionClosed => 502,
2666
2667 ErrorType::HTTPStatus(status) => *status,
2669
2670 ErrorType::InternalError => {
2672 let error_str = e.to_string();
2674 if error_str.contains("upstream")
2675 || error_str.contains("DNS")
2676 || error_str.contains("resolve")
2677 {
2678 502
2679 } else {
2680 500
2681 }
2682 }
2683
2684 _ => 502,
2686 };
2687
2688 error!(
2689 correlation_id = %ctx.trace_id,
2690 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2691 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2692 error_type = ?e.etype(),
2693 error = %e,
2694 error_code = error_code,
2695 "Proxy error occurred"
2696 );
2697
2698 self.metrics
2700 .record_blocked_request(&format!("proxy_error_{}", error_code));
2701
2702 let error_message = match error_code {
2706 400 => "Bad Request",
2707 502 => "Bad Gateway",
2708 503 => "Service Unavailable",
2709 504 => "Gateway Timeout",
2710 _ => "Internal Server Error",
2711 };
2712
2713 let body = format!(
2715 r#"{{"error":"{} {}","trace_id":"{}"}}"#,
2716 error_code, error_message, ctx.trace_id
2717 );
2718
2719 let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
2721 header
2722 .insert_header("Content-Type", "application/json")
2723 .ok();
2724 header
2725 .insert_header("Content-Length", body.len().to_string())
2726 .ok();
2727 header
2728 .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
2729 .ok();
2730 header.insert_header("Connection", "close").ok();
2731
2732 if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
2734 warn!(
2735 correlation_id = %ctx.trace_id,
2736 error = %write_err,
2737 "Failed to write error response header"
2738 );
2739 } else {
2740 if let Err(write_err) = session
2742 .write_response_body(Some(bytes::Bytes::from(body)), true)
2743 .await
2744 {
2745 warn!(
2746 correlation_id = %ctx.trace_id,
2747 error = %write_err,
2748 "Failed to write error response body"
2749 );
2750 }
2751 }
2752
2753 pingora_proxy::FailToProxy {
2756 error_code,
2757 can_reuse_downstream: false,
2758 }
2759 }
2760
2761 fn error_while_proxy(
2767 &self,
2768 peer: &HttpPeer,
2769 session: &mut Session,
2770 e: Box<Error>,
2771 ctx: &mut Self::CTX,
2772 client_reused: bool,
2773 ) -> Box<Error> {
2774 let error_type = e.etype().clone();
2775 let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
2776
2777 let is_retryable = matches!(
2779 error_type,
2780 ErrorType::ConnectTimedout
2781 | ErrorType::ReadTimedout
2782 | ErrorType::WriteTimedout
2783 | ErrorType::ConnectionClosed
2784 | ErrorType::ConnectRefused
2785 );
2786
2787 warn!(
2789 correlation_id = %ctx.trace_id,
2790 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2791 upstream = %upstream_id,
2792 peer_address = %peer.address(),
2793 error_type = ?error_type,
2794 error = %e,
2795 client_reused = client_reused,
2796 is_retryable = is_retryable,
2797 "Error during proxy operation"
2798 );
2799
2800 let peer_address = peer.address().to_string();
2803 let upstream_pools = self.upstream_pools.clone();
2804 let upstream_id_owned = upstream_id.to_string();
2805 tokio::spawn(async move {
2806 if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
2807 pool.report_result(&peer_address, false).await;
2808 }
2809 });
2810
2811 self.metrics
2813 .record_blocked_request(&format!("proxy_error_{:?}", error_type));
2814
2815 let mut enhanced_error = e.more_context(format!(
2817 "Upstream: {}, Peer: {}, Attempts: {}",
2818 upstream_id,
2819 peer.address(),
2820 ctx.upstream_attempts
2821 ));
2822
2823 if is_retryable {
2828 let can_retry = if client_reused {
2829 !session.as_ref().retry_buffer_truncated()
2831 } else {
2832 true
2834 };
2835
2836 enhanced_error.retry.decide_reuse(can_retry);
2837
2838 if can_retry {
2839 debug!(
2840 correlation_id = %ctx.trace_id,
2841 upstream = %upstream_id,
2842 error_type = ?error_type,
2843 "Error is retryable, will attempt retry"
2844 );
2845 }
2846 } else {
2847 enhanced_error.retry.decide_reuse(false);
2849 }
2850
2851 enhanced_error
2852 }
2853
2854 async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
2855 self.reload_coordinator.dec_requests();
2857
2858 if !ctx.shadow_sent {
2860 if let Some(shadow_pending) = ctx.shadow_pending.take() {
2861 let body = if shadow_pending.include_body && !ctx.body_buffer.is_empty() {
2862 Some(ctx.body_buffer.clone())
2864 } else {
2865 None
2866 };
2867
2868 trace!(
2869 correlation_id = %ctx.trace_id,
2870 body_size = body.as_ref().map(|b| b.len()).unwrap_or(0),
2871 "Firing deferred shadow request with buffered body"
2872 );
2873
2874 shadow_pending.manager.shadow_request(
2875 shadow_pending.headers,
2876 body,
2877 shadow_pending.request_ctx,
2878 );
2879 ctx.shadow_sent = true;
2880 }
2881 }
2882
2883 let duration = ctx.elapsed();
2884
2885 let status = session
2887 .response_written()
2888 .map(|r| r.status.as_u16())
2889 .unwrap_or(0);
2890
2891 if let (Some(ref peer_addr), Some(ref upstream_id)) =
2894 (&ctx.selected_upstream_address, &ctx.upstream)
2895 {
2896 let success = status > 0 && status < 500;
2898
2899 if let Some(pool) = self.upstream_pools.get(upstream_id).await {
2900 pool.report_result_with_latency(peer_addr, success, Some(duration))
2901 .await;
2902 trace!(
2903 correlation_id = %ctx.trace_id,
2904 upstream = %upstream_id,
2905 peer_address = %peer_addr,
2906 success = success,
2907 duration_ms = duration.as_millis(),
2908 status = status,
2909 "Reported result to adaptive load balancer"
2910 );
2911 }
2912
2913 if ctx.inference_rate_limit_enabled && success {
2915 let cold_detected = self.warmth_tracker.record_request(peer_addr, duration);
2916 if cold_detected {
2917 debug!(
2918 correlation_id = %ctx.trace_id,
2919 upstream = %upstream_id,
2920 peer_address = %peer_addr,
2921 duration_ms = duration.as_millis(),
2922 "Cold model detected on inference upstream"
2923 );
2924 }
2925 }
2926 }
2927
2928 if ctx.inference_rate_limit_enabled {
2931 if let (Some(route_id), Some(ref rate_limit_key)) =
2932 (ctx.route_id.as_deref(), &ctx.inference_rate_limit_key)
2933 {
2934 let response_headers = session
2936 .response_written()
2937 .map(|r| &r.headers)
2938 .cloned()
2939 .unwrap_or_default();
2940
2941 let streaming_result = if ctx.inference_streaming_response {
2943 ctx.inference_streaming_counter
2944 .as_ref()
2945 .map(|counter| counter.finalize())
2946 } else {
2947 None
2948 };
2949
2950 if let Some(ref result) = streaming_result {
2952 debug!(
2953 correlation_id = %ctx.trace_id,
2954 output_tokens = result.output_tokens,
2955 input_tokens = ?result.input_tokens,
2956 source = ?result.source,
2957 content_length = result.content_length,
2958 "Finalized streaming token count"
2959 );
2960 }
2961
2962 if ctx.inference_streaming_response {
2964 if let Some(ref route_config) = ctx.route_config {
2965 if let Some(ref inference) = route_config.inference {
2966 if let Some(ref guardrails) = inference.guardrails {
2967 if let Some(ref pii_config) = guardrails.pii_detection {
2968 if pii_config.enabled {
2969 if let Some(ref counter) = ctx.inference_streaming_counter {
2971 let response_content = counter.content();
2972 if !response_content.is_empty() {
2973 let pii_result = self
2974 .guardrail_processor
2975 .check_pii(
2976 pii_config,
2977 response_content,
2978 ctx.route_id.as_deref(),
2979 &ctx.trace_id,
2980 )
2981 .await;
2982
2983 match pii_result {
2984 crate::inference::PiiCheckResult::Detected {
2985 detections,
2986 redacted_content: _,
2987 } => {
2988 warn!(
2989 correlation_id = %ctx.trace_id,
2990 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2991 detection_count = detections.len(),
2992 "PII detected in inference response"
2993 );
2994
2995 ctx.pii_detection_categories = detections
2997 .iter()
2998 .map(|d| d.category.clone())
2999 .collect();
3000
3001 for detection in &detections {
3003 self.metrics.record_pii_detected(
3004 ctx.route_id.as_deref().unwrap_or("unknown"),
3005 &detection.category,
3006 );
3007 }
3008 }
3009 crate::inference::PiiCheckResult::Clean => {
3010 trace!(
3011 correlation_id = %ctx.trace_id,
3012 "No PII detected in response"
3013 );
3014 }
3015 crate::inference::PiiCheckResult::Error { message } => {
3016 debug!(
3017 correlation_id = %ctx.trace_id,
3018 error = %message,
3019 "PII detection check failed"
3020 );
3021 }
3022 }
3023 }
3024 }
3025 }
3026 }
3027 }
3028 }
3029 }
3030 }
3031
3032 let empty_body: &[u8] = &[];
3036
3037 if let Some(actual_estimate) = self.inference_rate_limit_manager.record_actual(
3038 route_id,
3039 rate_limit_key,
3040 &response_headers,
3041 empty_body,
3042 ctx.inference_estimated_tokens,
3043 ) {
3044 let (actual_tokens, source_info) = if let Some(ref streaming) = streaming_result {
3046 if streaming.total_tokens.is_some() {
3048 (streaming.total_tokens.unwrap(), "streaming_api")
3049 } else if actual_estimate.source == crate::inference::TokenSource::Estimated {
3050 let total = ctx.inference_input_tokens + streaming.output_tokens;
3053 (total, "streaming_tiktoken")
3054 } else {
3055 (actual_estimate.tokens, "headers")
3056 }
3057 } else {
3058 (actual_estimate.tokens, "headers")
3059 };
3060
3061 ctx.inference_actual_tokens = Some(actual_tokens);
3062
3063 debug!(
3064 correlation_id = %ctx.trace_id,
3065 route_id = route_id,
3066 estimated_tokens = ctx.inference_estimated_tokens,
3067 actual_tokens = actual_tokens,
3068 source = source_info,
3069 streaming_response = ctx.inference_streaming_response,
3070 model = ?ctx.inference_model,
3071 "Recorded actual inference tokens"
3072 );
3073
3074 if ctx.inference_budget_enabled {
3076 let alerts = self.inference_rate_limit_manager.record_budget(
3077 route_id,
3078 rate_limit_key,
3079 actual_tokens,
3080 );
3081
3082 for alert in alerts.iter() {
3084 warn!(
3085 correlation_id = %ctx.trace_id,
3086 route_id = route_id,
3087 tenant = %alert.tenant,
3088 threshold_pct = alert.threshold * 100.0,
3089 tokens_used = alert.tokens_used,
3090 tokens_limit = alert.tokens_limit,
3091 "Token budget alert threshold crossed"
3092 );
3093 }
3094
3095 if let Some(status) = self.inference_rate_limit_manager.budget_status(
3097 route_id,
3098 rate_limit_key,
3099 ) {
3100 ctx.inference_budget_remaining = Some(status.tokens_remaining as i64);
3101 }
3102 }
3103
3104 if ctx.inference_cost_enabled {
3106 if let Some(model) = ctx.inference_model.as_deref() {
3107 let (input_tokens, output_tokens) = if let Some(ref streaming) = streaming_result {
3109 let input = streaming.input_tokens.unwrap_or(ctx.inference_input_tokens);
3111 let output = streaming.output_tokens;
3112 (input, output)
3113 } else {
3114 let input = ctx.inference_input_tokens;
3116 let output = actual_tokens.saturating_sub(input);
3117 (input, output)
3118 };
3119 ctx.inference_output_tokens = output_tokens;
3120
3121 if let Some(cost_result) = self.inference_rate_limit_manager.calculate_cost(
3122 route_id,
3123 model,
3124 input_tokens,
3125 output_tokens,
3126 ) {
3127 ctx.inference_request_cost = Some(cost_result.total_cost);
3128
3129 trace!(
3130 correlation_id = %ctx.trace_id,
3131 route_id = route_id,
3132 model = model,
3133 input_tokens = input_tokens,
3134 output_tokens = output_tokens,
3135 total_cost = cost_result.total_cost,
3136 currency = %cost_result.currency,
3137 "Calculated inference request cost"
3138 );
3139 }
3140 }
3141 }
3142 }
3143 }
3144 }
3145
3146 if self.log_manager.should_log_access(status) {
3148 let access_entry = AccessLogEntry {
3149 timestamp: chrono::Utc::now().to_rfc3339(),
3150 trace_id: ctx.trace_id.clone(),
3151 method: ctx.method.clone(),
3152 path: ctx.path.clone(),
3153 query: ctx.query.clone(),
3154 protocol: "HTTP/1.1".to_string(),
3155 status,
3156 body_bytes: ctx.response_bytes,
3157 duration_ms: duration.as_millis() as u64,
3158 client_ip: ctx.client_ip.clone(),
3159 user_agent: ctx.user_agent.clone(),
3160 referer: ctx.referer.clone(),
3161 host: ctx.host.clone(),
3162 route_id: ctx.route_id.clone(),
3163 upstream: ctx.upstream.clone(),
3164 upstream_attempts: ctx.upstream_attempts,
3165 instance_id: self.app_state.instance_id.clone(),
3166 namespace: ctx.namespace.clone(),
3167 service: ctx.service.clone(),
3168 body_bytes_sent: ctx.response_bytes,
3170 upstream_addr: ctx.selected_upstream_address.clone(),
3171 connection_reused: ctx.connection_reused,
3172 rate_limit_hit: status == 429,
3173 geo_country: ctx.geo_country_code.clone(),
3174 };
3175 self.log_manager.log_access(&access_entry);
3176 }
3177
3178 if tracing::enabled!(tracing::Level::DEBUG) {
3180 debug!(
3181 trace_id = %ctx.trace_id,
3182 method = %ctx.method,
3183 path = %ctx.path,
3184 route_id = ?ctx.route_id,
3185 upstream = ?ctx.upstream,
3186 status = status,
3187 duration_ms = duration.as_millis() as u64,
3188 upstream_attempts = ctx.upstream_attempts,
3189 error = ?_error.map(|e| e.to_string()),
3190 "Request completed"
3191 );
3192 }
3193
3194 if ctx.is_websocket_upgrade && status == 101 {
3196 info!(
3197 trace_id = %ctx.trace_id,
3198 route_id = ?ctx.route_id,
3199 upstream = ?ctx.upstream,
3200 client_ip = %ctx.client_ip,
3201 "WebSocket connection established"
3202 );
3203 }
3204
3205 if let Some(span) = ctx.otel_span.take() {
3207 span.end();
3208 }
3209 }
3210}
3211
3212impl SentinelProxy {
3217 async fn process_body_chunk_streaming(
3219 &self,
3220 body: &mut Option<Bytes>,
3221 end_of_stream: bool,
3222 ctx: &mut RequestContext,
3223 ) -> Result<(), Box<Error>> {
3224 let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
3226 let chunk_index = ctx.request_body_chunk_index;
3227 ctx.request_body_chunk_index += 1;
3228 ctx.body_bytes_inspected += chunk_data.len() as u64;
3229
3230 debug!(
3231 correlation_id = %ctx.trace_id,
3232 chunk_index = chunk_index,
3233 chunk_size = chunk_data.len(),
3234 end_of_stream = end_of_stream,
3235 "Streaming body chunk to agents"
3236 );
3237
3238 let agent_ctx = crate::agents::AgentCallContext {
3240 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3241 metadata: sentinel_agent_protocol::RequestMetadata {
3242 correlation_id: ctx.trace_id.clone(),
3243 request_id: ctx.trace_id.clone(),
3244 client_ip: ctx.client_ip.clone(),
3245 client_port: 0,
3246 server_name: ctx.host.clone(),
3247 protocol: "HTTP/1.1".to_string(),
3248 tls_version: None,
3249 tls_cipher: None,
3250 route_id: ctx.route_id.clone(),
3251 upstream_id: ctx.upstream.clone(),
3252 timestamp: chrono::Utc::now().to_rfc3339(),
3253 traceparent: ctx.traceparent(),
3254 },
3255 route_id: ctx.route_id.clone(),
3256 upstream_id: ctx.upstream.clone(),
3257 request_body: None, response_body: None,
3259 };
3260
3261 let agent_ids = ctx.body_inspection_agents.clone();
3262 let total_size = None; match self
3265 .agent_manager
3266 .process_request_body_streaming(
3267 &agent_ctx,
3268 &chunk_data,
3269 end_of_stream,
3270 chunk_index,
3271 ctx.body_bytes_inspected as usize,
3272 total_size,
3273 &agent_ids,
3274 )
3275 .await
3276 {
3277 Ok(decision) => {
3278 ctx.agent_needs_more = decision.needs_more;
3280
3281 if let Some(ref mutation) = decision.request_body_mutation {
3283 if !mutation.is_pass_through() {
3284 if mutation.is_drop() {
3285 *body = None;
3287 trace!(
3288 correlation_id = %ctx.trace_id,
3289 chunk_index = chunk_index,
3290 "Agent dropped body chunk"
3291 );
3292 } else if let Some(ref new_data) = mutation.data {
3293 *body = Some(Bytes::from(new_data.clone()));
3295 trace!(
3296 correlation_id = %ctx.trace_id,
3297 chunk_index = chunk_index,
3298 original_size = chunk_data.len(),
3299 new_size = new_data.len(),
3300 "Agent mutated body chunk"
3301 );
3302 }
3303 }
3304 }
3305
3306 if !decision.needs_more && !decision.is_allow() {
3308 warn!(
3309 correlation_id = %ctx.trace_id,
3310 action = ?decision.action,
3311 "Agent blocked request body"
3312 );
3313 self.metrics.record_blocked_request("agent_body_inspection");
3314
3315 let (status, message) = match &decision.action {
3316 crate::agents::AgentAction::Block { status, body, .. } => (
3317 *status,
3318 body.clone().unwrap_or_else(|| "Blocked".to_string()),
3319 ),
3320 _ => (403, "Forbidden".to_string()),
3321 };
3322
3323 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3324 }
3325
3326 trace!(
3327 correlation_id = %ctx.trace_id,
3328 needs_more = decision.needs_more,
3329 "Agent processed body chunk"
3330 );
3331 }
3332 Err(e) => {
3333 let fail_closed = ctx
3334 .route_config
3335 .as_ref()
3336 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3337 .unwrap_or(false);
3338
3339 if fail_closed {
3340 error!(
3341 correlation_id = %ctx.trace_id,
3342 error = %e,
3343 "Agent streaming body inspection failed, blocking (fail-closed)"
3344 );
3345 return Err(Error::explain(
3346 ErrorType::HTTPStatus(503),
3347 "Service unavailable",
3348 ));
3349 } else {
3350 warn!(
3351 correlation_id = %ctx.trace_id,
3352 error = %e,
3353 "Agent streaming body inspection failed, allowing (fail-open)"
3354 );
3355 }
3356 }
3357 }
3358
3359 Ok(())
3360 }
3361
3362 async fn send_buffered_body_to_agents(
3364 &self,
3365 end_of_stream: bool,
3366 ctx: &mut RequestContext,
3367 ) -> Result<(), Box<Error>> {
3368 debug!(
3369 correlation_id = %ctx.trace_id,
3370 buffer_size = ctx.body_buffer.len(),
3371 end_of_stream = end_of_stream,
3372 agent_count = ctx.body_inspection_agents.len(),
3373 decompression_enabled = ctx.decompression_enabled,
3374 "Sending buffered body to agents for inspection"
3375 );
3376
3377 let body_for_inspection = if ctx.decompression_enabled {
3379 if let Some(ref encoding) = ctx.body_content_encoding {
3380 let config = crate::decompression::DecompressionConfig {
3381 max_ratio: ctx.max_decompression_ratio,
3382 max_output_bytes: ctx.max_decompression_bytes,
3383 };
3384
3385 match crate::decompression::decompress_body(
3386 &ctx.body_buffer,
3387 encoding,
3388 &config,
3389 ) {
3390 Ok(result) => {
3391 ctx.body_was_decompressed = true;
3392 self.metrics
3393 .record_decompression_success(encoding, result.ratio);
3394 debug!(
3395 correlation_id = %ctx.trace_id,
3396 encoding = %encoding,
3397 compressed_size = result.compressed_size,
3398 decompressed_size = result.decompressed_size,
3399 ratio = result.ratio,
3400 "Body decompressed for agent inspection"
3401 );
3402 result.data
3403 }
3404 Err(e) => {
3405 let failure_reason = match &e {
3407 crate::decompression::DecompressionError::RatioExceeded { .. } => {
3408 "ratio_exceeded"
3409 }
3410 crate::decompression::DecompressionError::SizeExceeded { .. } => {
3411 "size_exceeded"
3412 }
3413 crate::decompression::DecompressionError::InvalidData { .. } => {
3414 "invalid_data"
3415 }
3416 crate::decompression::DecompressionError::UnsupportedEncoding { .. } => {
3417 "unsupported"
3418 }
3419 crate::decompression::DecompressionError::IoError(_) => "io_error",
3420 };
3421 self.metrics
3422 .record_decompression_failure(encoding, failure_reason);
3423
3424 let fail_closed = ctx
3426 .route_config
3427 .as_ref()
3428 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3429 .unwrap_or(false);
3430
3431 if fail_closed {
3432 error!(
3433 correlation_id = %ctx.trace_id,
3434 error = %e,
3435 encoding = %encoding,
3436 "Decompression failed, blocking (fail-closed)"
3437 );
3438 return Err(Error::explain(
3439 ErrorType::HTTPStatus(400),
3440 "Invalid compressed body",
3441 ));
3442 } else {
3443 warn!(
3444 correlation_id = %ctx.trace_id,
3445 error = %e,
3446 encoding = %encoding,
3447 "Decompression failed, sending compressed body (fail-open)"
3448 );
3449 ctx.body_buffer.clone()
3450 }
3451 }
3452 }
3453 } else {
3454 ctx.body_buffer.clone()
3455 }
3456 } else {
3457 ctx.body_buffer.clone()
3458 };
3459
3460 let agent_ctx = crate::agents::AgentCallContext {
3461 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3462 metadata: sentinel_agent_protocol::RequestMetadata {
3463 correlation_id: ctx.trace_id.clone(),
3464 request_id: ctx.trace_id.clone(),
3465 client_ip: ctx.client_ip.clone(),
3466 client_port: 0,
3467 server_name: ctx.host.clone(),
3468 protocol: "HTTP/1.1".to_string(),
3469 tls_version: None,
3470 tls_cipher: None,
3471 route_id: ctx.route_id.clone(),
3472 upstream_id: ctx.upstream.clone(),
3473 timestamp: chrono::Utc::now().to_rfc3339(),
3474 traceparent: ctx.traceparent(),
3475 },
3476 route_id: ctx.route_id.clone(),
3477 upstream_id: ctx.upstream.clone(),
3478 request_body: Some(body_for_inspection.clone()),
3479 response_body: None,
3480 };
3481
3482 let agent_ids = ctx.body_inspection_agents.clone();
3483 match self
3484 .agent_manager
3485 .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
3486 .await
3487 {
3488 Ok(decision) => {
3489 if !decision.is_allow() {
3490 warn!(
3491 correlation_id = %ctx.trace_id,
3492 action = ?decision.action,
3493 "Agent blocked request body"
3494 );
3495 self.metrics.record_blocked_request("agent_body_inspection");
3496
3497 let (status, message) = match &decision.action {
3498 crate::agents::AgentAction::Block { status, body, .. } => (
3499 *status,
3500 body.clone().unwrap_or_else(|| "Blocked".to_string()),
3501 ),
3502 _ => (403, "Forbidden".to_string()),
3503 };
3504
3505 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3506 }
3507
3508 trace!(
3509 correlation_id = %ctx.trace_id,
3510 "Agent allowed request body"
3511 );
3512 }
3513 Err(e) => {
3514 let fail_closed = ctx
3515 .route_config
3516 .as_ref()
3517 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3518 .unwrap_or(false);
3519
3520 if fail_closed {
3521 error!(
3522 correlation_id = %ctx.trace_id,
3523 error = %e,
3524 "Agent body inspection failed, blocking (fail-closed)"
3525 );
3526 return Err(Error::explain(
3527 ErrorType::HTTPStatus(503),
3528 "Service unavailable",
3529 ));
3530 } else {
3531 warn!(
3532 correlation_id = %ctx.trace_id,
3533 error = %e,
3534 "Agent body inspection failed, allowing (fail-open)"
3535 );
3536 }
3537 }
3538 }
3539
3540 Ok(())
3541 }
3542}