1use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14 CacheKey, CacheMeta, ForcedInvalidationKind, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::inference::{
23 extract_inference_content, is_sse_response, PromptInjectionResult, StreamingTokenCounter,
24};
25use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
26use crate::rate_limit::HeaderAccessor;
27use crate::routing::RequestInfo;
28
29use super::context::{FallbackReason, RequestContext};
30use super::fallback::FallbackEvaluator;
31use super::fallback_metrics::get_fallback_metrics;
32use super::model_routing;
33use super::model_routing_metrics::get_model_routing_metrics;
34use super::SentinelProxy;
35
36struct NoHeaderAccessor;
38impl HeaderAccessor for NoHeaderAccessor {
39 fn get_header(&self, _name: &str) -> Option<String> {
40 None
41 }
42}
43
44#[async_trait]
45impl ProxyHttp for SentinelProxy {
46 type CTX = RequestContext;
47
48 fn new_ctx(&self) -> Self::CTX {
49 RequestContext::new()
50 }
51
52 fn fail_to_connect(
53 &self,
54 _session: &mut Session,
55 peer: &HttpPeer,
56 ctx: &mut Self::CTX,
57 e: Box<Error>,
58 ) -> Box<Error> {
59 error!(
60 correlation_id = %ctx.trace_id,
61 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
62 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
63 peer_address = %peer.address(),
64 error = %e,
65 "Failed to connect to upstream peer"
66 );
67 e
69 }
70
71 async fn early_request_filter(
74 &self,
75 session: &mut Session,
76 ctx: &mut Self::CTX,
77 ) -> Result<(), Box<Error>> {
78 let req_header = session.req_header();
80 let method = req_header.method.as_str();
81 let path = req_header.uri.path();
82 let host = req_header
83 .headers
84 .get("host")
85 .and_then(|h| h.to_str().ok())
86 .unwrap_or("");
87
88 ctx.method = method.to_string();
89 ctx.path = path.to_string();
90 ctx.host = Some(host.to_string());
91
92 let route_match = {
94 let route_matcher = self.route_matcher.read();
95 let request_info = RequestInfo::new(method, path, host);
96 match route_matcher.match_request(&request_info) {
97 Some(m) => m,
98 None => return Ok(()), }
100 };
101
102 ctx.trace_id = self.get_trace_id(session);
103 ctx.route_id = Some(route_match.route_id.to_string());
104 ctx.route_config = Some(route_match.config.clone());
105
106 if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
108 if let Ok(s) = traceparent.to_str() {
109 ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
110 }
111 }
112
113 if let Some(tracer) = crate::otel::get_tracer() {
115 ctx.otel_span = Some(tracer.start_span(method, path, ctx.trace_context.as_ref()));
116 }
117
118 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
120 trace!(
121 correlation_id = %ctx.trace_id,
122 route_id = %route_match.route_id,
123 builtin_handler = ?route_match.config.builtin_handler,
124 "Handling builtin route in early_request_filter"
125 );
126
127 let handled = self
129 .handle_builtin_route(session, ctx, &route_match)
130 .await?;
131
132 if handled {
133 return Err(Error::explain(
135 ErrorType::InternalError,
136 "Builtin handler complete",
137 ));
138 }
139 }
140
141 Ok(())
142 }
143
144 async fn upstream_peer(
145 &self,
146 session: &mut Session,
147 ctx: &mut Self::CTX,
148 ) -> Result<Box<HttpPeer>, Box<Error>> {
149 self.reload_coordinator.inc_requests();
151
152 if ctx.config.is_none() {
154 ctx.config = Some(self.config_manager.current());
155 }
156
157 if ctx.client_ip.is_empty() {
159 ctx.client_ip = session
160 .client_addr()
161 .map(|a| a.to_string())
162 .unwrap_or_else(|| "unknown".to_string());
163 }
164
165 let req_header = session.req_header();
166
167 if ctx.method.is_empty() {
169 ctx.method = req_header.method.to_string();
170 ctx.path = req_header.uri.path().to_string();
171 ctx.query = req_header.uri.query().map(|q| q.to_string());
172 ctx.host = req_header
173 .headers
174 .get("host")
175 .and_then(|v| v.to_str().ok())
176 .map(|s| s.to_string());
177 }
178 ctx.user_agent = req_header
179 .headers
180 .get("user-agent")
181 .and_then(|v| v.to_str().ok())
182 .map(|s| s.to_string());
183 ctx.referer = req_header
184 .headers
185 .get("referer")
186 .and_then(|v| v.to_str().ok())
187 .map(|s| s.to_string());
188
189 trace!(
190 correlation_id = %ctx.trace_id,
191 client_ip = %ctx.client_ip,
192 "Request received, initializing context"
193 );
194
195 let route_match = if let Some(ref route_config) = ctx.route_config {
197 let route_id = ctx.route_id.as_deref().unwrap_or("");
198 crate::routing::RouteMatch {
199 route_id: sentinel_common::RouteId::new(route_id),
200 config: route_config.clone(),
201 }
202 } else {
203 let (match_result, route_duration) = {
205 let route_matcher = self.route_matcher.read();
206 let host = ctx.host.as_deref().unwrap_or("");
207
208 let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
210
211 if route_matcher.needs_headers() {
213 request_info = request_info
214 .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
215 }
216
217 if route_matcher.needs_query_params() {
219 request_info =
220 request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
221 }
222
223 trace!(
224 correlation_id = %ctx.trace_id,
225 method = %request_info.method,
226 path = %request_info.path,
227 host = %request_info.host,
228 "Built request info for route matching"
229 );
230
231 let route_start = std::time::Instant::now();
232 let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
233 warn!(
234 correlation_id = %ctx.trace_id,
235 method = %request_info.method,
236 path = %request_info.path,
237 host = %request_info.host,
238 "No matching route found for request"
239 );
240 Error::explain(ErrorType::InternalError, "No matching route found")
241 })?;
242 let route_duration = route_start.elapsed();
243 (route_match, route_duration)
245 };
246
247 ctx.route_id = Some(match_result.route_id.to_string());
248 ctx.route_config = Some(match_result.config.clone());
249
250 if ctx.trace_id.is_empty() {
252 ctx.trace_id = self.get_trace_id(session);
253
254 if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER)
256 {
257 if let Ok(s) = traceparent.to_str() {
258 ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
259 }
260 }
261
262 if let Some(tracer) = crate::otel::get_tracer() {
264 ctx.otel_span = Some(tracer.start_span(
265 &ctx.method,
266 &ctx.path,
267 ctx.trace_context.as_ref(),
268 ));
269 }
270 }
271
272 trace!(
273 correlation_id = %ctx.trace_id,
274 route_id = %match_result.route_id,
275 route_duration_us = route_duration.as_micros(),
276 service_type = ?match_result.config.service_type,
277 "Route matched"
278 );
279 match_result
280 };
281
282 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
284 trace!(
285 correlation_id = %ctx.trace_id,
286 route_id = %route_match.route_id,
287 builtin_handler = ?route_match.config.builtin_handler,
288 "Route type is builtin, skipping upstream"
289 );
290 ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
292 return Err(Error::explain(
294 ErrorType::InternalError,
295 "Builtin handler handled in request_filter",
296 ));
297 }
298
299 if route_match.config.service_type == sentinel_config::ServiceType::Static {
301 trace!(
302 correlation_id = %ctx.trace_id,
303 route_id = %route_match.route_id,
304 "Route type is static, checking for static server"
305 );
306 if self
308 .static_servers
309 .get(route_match.route_id.as_str())
310 .await
311 .is_some()
312 {
313 ctx.upstream = Some(format!("_static_{}", route_match.route_id));
315 info!(
316 correlation_id = %ctx.trace_id,
317 route_id = %route_match.route_id,
318 path = %ctx.path,
319 "Serving static file"
320 );
321 return Err(Error::explain(
323 ErrorType::InternalError,
324 "Static file serving handled in request_filter",
325 ));
326 }
327 }
328
329 let mut model_routing_applied = false;
332 if let Some(ref inference) = route_match.config.inference {
333 if let Some(ref model_routing) = inference.model_routing {
334 let model = model_routing::extract_model_from_headers(&req_header.headers);
336
337 if let Some(ref model_name) = model {
338 if let Some(routing_result) =
340 model_routing::find_upstream_for_model(model_routing, model_name)
341 {
342 debug!(
343 correlation_id = %ctx.trace_id,
344 route_id = %route_match.route_id,
345 model = %model_name,
346 upstream = %routing_result.upstream,
347 is_default = routing_result.is_default,
348 provider_override = ?routing_result.provider,
349 "Model-based routing selected upstream"
350 );
351
352 ctx.record_model_routing(
353 &routing_result.upstream,
354 Some(model_name.clone()),
355 routing_result.provider,
356 );
357 model_routing_applied = true;
358
359 if let Some(metrics) = get_model_routing_metrics() {
361 metrics.record_model_routed(
362 route_match.route_id.as_str(),
363 model_name,
364 &routing_result.upstream,
365 );
366 if routing_result.is_default {
367 metrics.record_default_upstream(route_match.route_id.as_str());
368 }
369 if let Some(provider) = routing_result.provider {
370 metrics.record_provider_override(
371 route_match.route_id.as_str(),
372 &routing_result.upstream,
373 provider.as_str(),
374 );
375 }
376 }
377 }
378 } else if let Some(ref default_upstream) = model_routing.default_upstream {
379 debug!(
381 correlation_id = %ctx.trace_id,
382 route_id = %route_match.route_id,
383 upstream = %default_upstream,
384 "Model-based routing using default upstream (no model header)"
385 );
386 ctx.record_model_routing(default_upstream, None, None);
387 model_routing_applied = true;
388
389 if let Some(metrics) = get_model_routing_metrics() {
391 metrics.record_no_model_header(route_match.route_id.as_str());
392 }
393 }
394 }
395 }
396
397 if !model_routing_applied {
399 if let Some(ref upstream) = route_match.config.upstream {
400 ctx.upstream = Some(upstream.clone());
401 trace!(
402 correlation_id = %ctx.trace_id,
403 route_id = %route_match.route_id,
404 upstream = %upstream,
405 "Upstream configured for route"
406 );
407 } else {
408 error!(
409 correlation_id = %ctx.trace_id,
410 route_id = %route_match.route_id,
411 "Route has no upstream configured"
412 );
413 return Err(Error::explain(
414 ErrorType::InternalError,
415 format!(
416 "Route '{}' has no upstream configured",
417 route_match.route_id
418 ),
419 ));
420 }
421 }
422
423 if let Some(ref fallback_config) = route_match.config.fallback {
426 let upstream_name = ctx.upstream.as_ref().unwrap();
427
428 let is_healthy = if let Some(pool) = self.upstream_pools.get(upstream_name).await {
430 pool.has_healthy_targets().await
431 } else {
432 false };
434
435 let is_budget_exhausted = ctx.inference_budget_exhausted;
437
438 let current_model = ctx.inference_model.as_deref();
440
441 let evaluator = FallbackEvaluator::new(
443 fallback_config,
444 ctx.tried_upstreams(),
445 ctx.fallback_attempt,
446 );
447
448 if let Some(decision) = evaluator.should_fallback_before_request(
450 upstream_name,
451 is_healthy,
452 is_budget_exhausted,
453 current_model,
454 ) {
455 info!(
456 correlation_id = %ctx.trace_id,
457 route_id = %route_match.route_id,
458 from_upstream = %upstream_name,
459 to_upstream = %decision.next_upstream,
460 reason = %decision.reason,
461 fallback_attempt = ctx.fallback_attempt + 1,
462 "Triggering fallback routing"
463 );
464
465 if let Some(metrics) = get_fallback_metrics() {
467 metrics.record_fallback_attempt(
468 route_match.route_id.as_str(),
469 upstream_name,
470 &decision.next_upstream,
471 &decision.reason,
472 );
473 }
474
475 ctx.record_fallback(decision.reason, &decision.next_upstream);
477
478 if let Some((original, mapped)) = decision.model_mapping {
480 if let Some(metrics) = get_fallback_metrics() {
482 metrics.record_model_mapping(
483 route_match.route_id.as_str(),
484 &original,
485 &mapped,
486 );
487 }
488
489 ctx.record_model_mapping(original, mapped);
490 trace!(
491 correlation_id = %ctx.trace_id,
492 original_model = ?ctx.model_mapping_applied().map(|m| &m.0),
493 mapped_model = ?ctx.model_mapping_applied().map(|m| &m.1),
494 "Applied model mapping for fallback"
495 );
496 }
497 }
498 }
499
500 debug!(
501 correlation_id = %ctx.trace_id,
502 route_id = %route_match.route_id,
503 upstream = ?ctx.upstream,
504 method = %req_header.method,
505 path = %req_header.uri.path(),
506 host = ctx.host.as_deref().unwrap_or("-"),
507 client_ip = %ctx.client_ip,
508 "Processing request"
509 );
510
511 if ctx
513 .upstream
514 .as_ref()
515 .is_some_and(|u| u.starts_with("_static_"))
516 {
517 return Err(Error::explain(
519 ErrorType::InternalError,
520 "Static route should be handled in request_filter",
521 ));
522 }
523
524 let upstream_name = ctx
525 .upstream
526 .as_ref()
527 .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
528
529 trace!(
530 correlation_id = %ctx.trace_id,
531 upstream = %upstream_name,
532 "Looking up upstream pool"
533 );
534
535 let pool = self
536 .upstream_pools
537 .get(upstream_name)
538 .await
539 .ok_or_else(|| {
540 error!(
541 correlation_id = %ctx.trace_id,
542 upstream = %upstream_name,
543 "Upstream pool not found"
544 );
545 Error::explain(
546 ErrorType::InternalError,
547 format!("Upstream pool '{}' not found", upstream_name),
548 )
549 })?;
550
551 let max_retries = route_match
553 .config
554 .retry_policy
555 .as_ref()
556 .map(|r| r.max_attempts)
557 .unwrap_or(1);
558
559 trace!(
560 correlation_id = %ctx.trace_id,
561 upstream = %upstream_name,
562 max_retries = max_retries,
563 "Starting upstream peer selection"
564 );
565
566 let mut last_error = None;
567 let selection_start = std::time::Instant::now();
568
569 for attempt in 1..=max_retries {
570 ctx.upstream_attempts = attempt;
571
572 trace!(
573 correlation_id = %ctx.trace_id,
574 upstream = %upstream_name,
575 attempt = attempt,
576 max_retries = max_retries,
577 "Attempting to select upstream peer"
578 );
579
580 match pool.select_peer(None).await {
581 Ok(peer) => {
582 let selection_duration = selection_start.elapsed();
583 let peer_addr = peer.address().to_string();
585 ctx.selected_upstream_address = Some(peer_addr.clone());
586 debug!(
587 correlation_id = %ctx.trace_id,
588 upstream = %upstream_name,
589 peer_address = %peer_addr,
590 attempt = attempt,
591 selection_duration_us = selection_duration.as_micros(),
592 "Selected upstream peer"
593 );
594 return Ok(Box::new(peer));
595 }
596 Err(e) => {
597 warn!(
598 correlation_id = %ctx.trace_id,
599 upstream = %upstream_name,
600 attempt = attempt,
601 max_retries = max_retries,
602 error = %e,
603 "Failed to select upstream peer"
604 );
605 last_error = Some(e);
606
607 if attempt < max_retries {
608 let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
610 trace!(
611 correlation_id = %ctx.trace_id,
612 backoff_ms = backoff.as_millis(),
613 "Backing off before retry"
614 );
615 sleep(backoff).await;
616 }
617 }
618 }
619 }
620
621 let selection_duration = selection_start.elapsed();
622 error!(
623 correlation_id = %ctx.trace_id,
624 upstream = %upstream_name,
625 attempts = max_retries,
626 selection_duration_ms = selection_duration.as_millis(),
627 last_error = ?last_error,
628 "All upstream selection attempts failed"
629 );
630
631 if ctx.used_fallback() {
633 if let Some(metrics) = get_fallback_metrics() {
634 metrics.record_fallback_exhausted(ctx.route_id.as_deref().unwrap_or("unknown"));
635 }
636 }
637
638 Err(Error::explain(
639 ErrorType::InternalError,
640 format!("All upstream attempts failed: {:?}", last_error),
641 ))
642 }
643
644 async fn request_filter(
645 &self,
646 session: &mut Session,
647 ctx: &mut Self::CTX,
648 ) -> Result<bool, Box<Error>> {
649 trace!(
650 correlation_id = %ctx.trace_id,
651 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
652 "Starting request filter phase"
653 );
654
655 if let Some(route_id) = ctx.route_id.as_deref() {
658 if self.rate_limit_manager.has_route_limiter(route_id) {
659 let rate_result = self.rate_limit_manager.check(
660 route_id,
661 &ctx.client_ip,
662 &ctx.path,
663 Option::<&NoHeaderAccessor>::None,
664 );
665
666 if rate_result.limit > 0 {
668 ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
669 limit: rate_result.limit,
670 remaining: rate_result.remaining,
671 reset_at: rate_result.reset_at,
672 });
673 }
674
675 if !rate_result.allowed {
676 use sentinel_config::RateLimitAction;
677
678 match rate_result.action {
679 RateLimitAction::Reject => {
680 warn!(
681 correlation_id = %ctx.trace_id,
682 route_id = route_id,
683 client_ip = %ctx.client_ip,
684 limiter = %rate_result.limiter,
685 limit = rate_result.limit,
686 remaining = rate_result.remaining,
687 "Request rate limited"
688 );
689 self.metrics.record_blocked_request("rate_limited");
690
691 let audit_entry = AuditLogEntry::rate_limited(
693 &ctx.trace_id,
694 &ctx.method,
695 &ctx.path,
696 &ctx.client_ip,
697 &rate_result.limiter,
698 )
699 .with_route_id(route_id)
700 .with_status_code(rate_result.status_code);
701 self.log_manager.log_audit(&audit_entry);
702
703 let body = rate_result
705 .message
706 .unwrap_or_else(|| "Rate limit exceeded".to_string());
707
708 let retry_after = rate_result.reset_at.saturating_sub(
710 std::time::SystemTime::now()
711 .duration_since(std::time::UNIX_EPOCH)
712 .unwrap_or_default()
713 .as_secs(),
714 );
715 crate::http_helpers::write_rate_limit_error(
716 session,
717 rate_result.status_code,
718 &body,
719 rate_result.limit,
720 rate_result.remaining,
721 rate_result.reset_at,
722 retry_after,
723 )
724 .await?;
725 return Ok(true); }
727 RateLimitAction::LogOnly => {
728 debug!(
729 correlation_id = %ctx.trace_id,
730 route_id = route_id,
731 "Rate limit exceeded (log only mode)"
732 );
733 }
735 RateLimitAction::Delay => {
736 if let Some(delay_ms) = rate_result.suggested_delay_ms {
738 let actual_delay = delay_ms.min(rate_result.max_delay_ms);
740
741 if actual_delay > 0 {
742 debug!(
743 correlation_id = %ctx.trace_id,
744 route_id = route_id,
745 suggested_delay_ms = delay_ms,
746 max_delay_ms = rate_result.max_delay_ms,
747 actual_delay_ms = actual_delay,
748 "Applying rate limit delay"
749 );
750
751 tokio::time::sleep(std::time::Duration::from_millis(
752 actual_delay,
753 ))
754 .await;
755 }
756 }
757 }
759 }
760 }
761 }
762 }
763
764 if let Some(route_id) = ctx.route_id.as_deref() {
767 if let Some(ref route_config) = ctx.route_config {
768 if route_config.service_type == sentinel_config::ServiceType::Inference
769 && self.inference_rate_limit_manager.has_route(route_id)
770 {
771 let headers = &session.req_header().headers;
774
775 let body = ctx.body_buffer.as_slice();
777
778 let rate_limit_key = &ctx.client_ip;
780
781 if let Some(check_result) = self.inference_rate_limit_manager.check(
782 route_id,
783 rate_limit_key,
784 headers,
785 body,
786 ) {
787 ctx.inference_rate_limit_enabled = true;
789 ctx.inference_estimated_tokens = check_result.estimated_tokens;
790 ctx.inference_rate_limit_key = Some(rate_limit_key.to_string());
791 ctx.inference_model = check_result.model.clone();
792
793 if !check_result.is_allowed() {
794 let retry_after_ms = check_result.retry_after_ms();
795 let retry_after_secs = retry_after_ms.div_ceil(1000);
796
797 warn!(
798 correlation_id = %ctx.trace_id,
799 route_id = route_id,
800 client_ip = %ctx.client_ip,
801 estimated_tokens = check_result.estimated_tokens,
802 model = ?check_result.model,
803 retry_after_ms = retry_after_ms,
804 "Inference rate limit exceeded (tokens)"
805 );
806 self.metrics.record_blocked_request("inference_rate_limited");
807
808 let audit_entry = AuditLogEntry::new(
810 &ctx.trace_id,
811 AuditEventType::RateLimitExceeded,
812 &ctx.method,
813 &ctx.path,
814 &ctx.client_ip,
815 )
816 .with_route_id(route_id)
817 .with_status_code(429)
818 .with_reason(format!(
819 "Token rate limit exceeded: estimated {} tokens, model={:?}",
820 check_result.estimated_tokens, check_result.model
821 ));
822 self.log_manager.log_audit(&audit_entry);
823
824 let body = "Token rate limit exceeded";
826 let reset_at = std::time::SystemTime::now()
827 .duration_since(std::time::UNIX_EPOCH)
828 .unwrap_or_default()
829 .as_secs()
830 + retry_after_secs;
831
832 crate::http_helpers::write_rate_limit_error(
834 session,
835 429,
836 body,
837 0, 0, reset_at,
840 retry_after_secs,
841 )
842 .await?;
843 return Ok(true); }
845
846 trace!(
847 correlation_id = %ctx.trace_id,
848 route_id = route_id,
849 estimated_tokens = check_result.estimated_tokens,
850 model = ?check_result.model,
851 "Inference rate limit check passed"
852 );
853
854 if self.inference_rate_limit_manager.has_budget(route_id) {
856 ctx.inference_budget_enabled = true;
857
858 if let Some(budget_result) = self.inference_rate_limit_manager.check_budget(
859 route_id,
860 rate_limit_key,
861 check_result.estimated_tokens,
862 ) {
863 if !budget_result.is_allowed() {
864 let retry_after_secs = budget_result.retry_after_secs();
865
866 warn!(
867 correlation_id = %ctx.trace_id,
868 route_id = route_id,
869 client_ip = %ctx.client_ip,
870 estimated_tokens = check_result.estimated_tokens,
871 retry_after_secs = retry_after_secs,
872 "Token budget exhausted"
873 );
874
875 ctx.inference_budget_exhausted = true;
876 self.metrics.record_blocked_request("budget_exhausted");
877
878 let audit_entry = AuditLogEntry::new(
880 &ctx.trace_id,
881 AuditEventType::RateLimitExceeded,
882 &ctx.method,
883 &ctx.path,
884 &ctx.client_ip,
885 )
886 .with_route_id(route_id)
887 .with_status_code(429)
888 .with_reason("Token budget exhausted".to_string());
889 self.log_manager.log_audit(&audit_entry);
890
891 let body = "Token budget exhausted";
893 let reset_at = std::time::SystemTime::now()
894 .duration_since(std::time::UNIX_EPOCH)
895 .unwrap_or_default()
896 .as_secs()
897 + retry_after_secs;
898
899 crate::http_helpers::write_rate_limit_error(
900 session,
901 429,
902 body,
903 0,
904 0,
905 reset_at,
906 retry_after_secs,
907 )
908 .await?;
909 return Ok(true);
910 }
911
912 let remaining = match &budget_result {
914 sentinel_common::budget::BudgetCheckResult::Allowed { remaining } => *remaining as i64,
915 sentinel_common::budget::BudgetCheckResult::Soft { remaining, .. } => *remaining,
916 _ => 0,
917 };
918 ctx.inference_budget_remaining = Some(remaining);
919
920 if let Some(status) = self.inference_rate_limit_manager.budget_status(
922 route_id,
923 rate_limit_key,
924 ) {
925 ctx.inference_budget_period_reset = Some(status.period_end);
926 }
927
928 trace!(
929 correlation_id = %ctx.trace_id,
930 route_id = route_id,
931 budget_remaining = remaining,
932 "Token budget check passed"
933 );
934 }
935 }
936
937 if self.inference_rate_limit_manager.has_cost_attribution(route_id) {
939 ctx.inference_cost_enabled = true;
940 }
941 }
942 }
943 }
944 }
945
946 if let Some(ref route_config) = ctx.route_config {
948 if let Some(ref inference) = route_config.inference {
949 if let Some(ref guardrails) = inference.guardrails {
950 if let Some(ref pi_config) = guardrails.prompt_injection {
951 if pi_config.enabled && !ctx.body_buffer.is_empty() {
952 ctx.guardrails_enabled = true;
953
954 if let Some(content) = extract_inference_content(&ctx.body_buffer) {
956 let result = self
957 .guardrail_processor
958 .check_prompt_injection(
959 pi_config,
960 &content,
961 ctx.inference_model.as_deref(),
962 ctx.route_id.as_deref(),
963 &ctx.trace_id,
964 )
965 .await;
966
967 match result {
968 PromptInjectionResult::Blocked {
969 status,
970 message,
971 detections,
972 } => {
973 warn!(
974 correlation_id = %ctx.trace_id,
975 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
976 detection_count = detections.len(),
977 "Prompt injection detected, blocking request"
978 );
979
980 self.metrics.record_blocked_request("prompt_injection");
981
982 ctx.guardrail_detection_categories = detections
984 .iter()
985 .map(|d| d.category.clone())
986 .collect();
987
988 let audit_entry = AuditLogEntry::new(
990 &ctx.trace_id,
991 AuditEventType::Blocked,
992 &ctx.method,
993 &ctx.path,
994 &ctx.client_ip,
995 )
996 .with_route_id(
997 ctx.route_id.as_deref().unwrap_or("unknown"),
998 )
999 .with_status_code(status)
1000 .with_reason("Prompt injection detected".to_string());
1001 self.log_manager.log_audit(&audit_entry);
1002
1003 crate::http_helpers::write_json_error(
1005 session,
1006 status,
1007 "prompt_injection_blocked",
1008 Some(&message),
1009 )
1010 .await?;
1011 return Ok(true);
1012 }
1013 PromptInjectionResult::Detected { detections } => {
1014 warn!(
1016 correlation_id = %ctx.trace_id,
1017 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1018 detection_count = detections.len(),
1019 "Prompt injection detected (logged only)"
1020 );
1021 ctx.guardrail_detection_categories = detections
1022 .iter()
1023 .map(|d| d.category.clone())
1024 .collect();
1025 }
1026 PromptInjectionResult::Warning { detections } => {
1027 ctx.guardrail_warning = true;
1029 ctx.guardrail_detection_categories = detections
1030 .iter()
1031 .map(|d| d.category.clone())
1032 .collect();
1033 debug!(
1034 correlation_id = %ctx.trace_id,
1035 "Prompt injection warning set"
1036 );
1037 }
1038 PromptInjectionResult::Clean => {
1039 trace!(
1040 correlation_id = %ctx.trace_id,
1041 "No prompt injection detected"
1042 );
1043 }
1044 PromptInjectionResult::Error { message } => {
1045 trace!(
1047 correlation_id = %ctx.trace_id,
1048 error = %message,
1049 "Prompt injection check error (failure mode applied)"
1050 );
1051 }
1052 }
1053 }
1054 }
1055 }
1056 }
1057 }
1058 }
1059
1060 if let Some(route_id) = ctx.route_id.as_deref() {
1062 if let Some(ref route_config) = ctx.route_config {
1063 for filter_id in &route_config.filters {
1064 if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
1065 ctx.geo_country_code = result.country_code.clone();
1067 ctx.geo_lookup_performed = true;
1068
1069 if !result.allowed {
1070 warn!(
1071 correlation_id = %ctx.trace_id,
1072 route_id = route_id,
1073 client_ip = %ctx.client_ip,
1074 country = ?result.country_code,
1075 filter_id = %filter_id,
1076 "Request blocked by geo filter"
1077 );
1078 self.metrics.record_blocked_request("geo_blocked");
1079
1080 let audit_entry = AuditLogEntry::new(
1082 &ctx.trace_id,
1083 AuditEventType::Blocked,
1084 &ctx.method,
1085 &ctx.path,
1086 &ctx.client_ip,
1087 )
1088 .with_route_id(route_id)
1089 .with_status_code(result.status_code)
1090 .with_reason(format!(
1091 "Geo blocked: country={}, filter={}",
1092 result.country_code.as_deref().unwrap_or("unknown"),
1093 filter_id
1094 ));
1095 self.log_manager.log_audit(&audit_entry);
1096
1097 let body = result
1099 .block_message
1100 .unwrap_or_else(|| "Access denied".to_string());
1101
1102 crate::http_helpers::write_error(
1103 session,
1104 result.status_code,
1105 &body,
1106 "text/plain",
1107 )
1108 .await?;
1109 return Ok(true); }
1111
1112 break;
1114 }
1115 }
1116 }
1117 }
1118
1119 let is_websocket_upgrade = session
1121 .req_header()
1122 .headers
1123 .get(http::header::UPGRADE)
1124 .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
1125 .unwrap_or(false);
1126
1127 if is_websocket_upgrade {
1128 ctx.is_websocket_upgrade = true;
1129
1130 if let Some(ref route_config) = ctx.route_config {
1132 if !route_config.websocket {
1133 warn!(
1134 correlation_id = %ctx.trace_id,
1135 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1136 client_ip = %ctx.client_ip,
1137 "WebSocket upgrade rejected: not enabled for route"
1138 );
1139
1140 self.metrics.record_blocked_request("websocket_not_enabled");
1141
1142 let audit_entry = AuditLogEntry::new(
1144 &ctx.trace_id,
1145 AuditEventType::Blocked,
1146 &ctx.method,
1147 &ctx.path,
1148 &ctx.client_ip,
1149 )
1150 .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
1151 .with_action("websocket_rejected")
1152 .with_reason("WebSocket not enabled for route");
1153 self.log_manager.log_audit(&audit_entry);
1154
1155 crate::http_helpers::write_error(
1157 session,
1158 403,
1159 "WebSocket not enabled for this route",
1160 "text/plain",
1161 )
1162 .await?;
1163 return Ok(true); }
1165
1166 debug!(
1167 correlation_id = %ctx.trace_id,
1168 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1169 "WebSocket upgrade request allowed"
1170 );
1171
1172 if route_config.websocket_inspection {
1174 let has_compression = session
1176 .req_header()
1177 .headers
1178 .get("Sec-WebSocket-Extensions")
1179 .and_then(|v| v.to_str().ok())
1180 .map(|s| s.contains("permessage-deflate"))
1181 .unwrap_or(false);
1182
1183 if has_compression {
1184 debug!(
1185 correlation_id = %ctx.trace_id,
1186 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1187 "WebSocket inspection skipped: permessage-deflate negotiated"
1188 );
1189 ctx.websocket_skip_inspection = true;
1190 } else {
1191 ctx.websocket_inspection_enabled = true;
1192
1193 ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
1195 sentinel_agent_protocol::EventType::WebSocketFrame,
1196 );
1197
1198 debug!(
1199 correlation_id = %ctx.trace_id,
1200 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1201 agent_count = ctx.websocket_inspection_agents.len(),
1202 "WebSocket frame inspection enabled"
1203 );
1204 }
1205 }
1206 }
1207 }
1208
1209 if let Some(route_config) = ctx.route_config.clone() {
1212 if route_config.service_type == sentinel_config::ServiceType::Static {
1213 trace!(
1214 correlation_id = %ctx.trace_id,
1215 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1216 "Handling static file route"
1217 );
1218 let route_match = crate::routing::RouteMatch {
1220 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1221 config: route_config.clone(),
1222 };
1223 return self.handle_static_route(session, ctx, &route_match).await;
1224 } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
1225 trace!(
1226 correlation_id = %ctx.trace_id,
1227 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1228 builtin_handler = ?route_config.builtin_handler,
1229 "Handling builtin route"
1230 );
1231 let route_match = crate::routing::RouteMatch {
1233 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1234 config: route_config.clone(),
1235 };
1236 return self.handle_builtin_route(session, ctx, &route_match).await;
1237 }
1238 }
1239
1240 if let Some(route_id) = ctx.route_id.clone() {
1242 if let Some(validator) = self.validators.get(&route_id).await {
1243 trace!(
1244 correlation_id = %ctx.trace_id,
1245 route_id = %route_id,
1246 "Running API schema validation"
1247 );
1248 if let Some(result) = self
1249 .validate_api_request(session, ctx, &route_id, &validator)
1250 .await?
1251 {
1252 debug!(
1253 correlation_id = %ctx.trace_id,
1254 route_id = %route_id,
1255 validation_passed = result,
1256 "API validation complete"
1257 );
1258 return Ok(result);
1259 }
1260 }
1261 }
1262
1263 let client_addr = session
1265 .client_addr()
1266 .map(|a| format!("{}", a))
1267 .unwrap_or_else(|| "unknown".to_string());
1268 let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
1269
1270 let req_header = session.req_header_mut();
1271
1272 req_header
1274 .insert_header("X-Correlation-Id", &ctx.trace_id)
1275 .ok();
1276 req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
1277
1278 let config = ctx
1280 .config
1281 .get_or_insert_with(|| self.config_manager.current());
1282
1283 const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; let header_count = req_header.headers.len();
1288 if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
1289 && header_count > config.limits.max_header_count
1290 {
1291 warn!(
1292 correlation_id = %ctx.trace_id,
1293 header_count = header_count,
1294 limit = config.limits.max_header_count,
1295 "Request blocked: exceeds header count limit"
1296 );
1297
1298 self.metrics.record_blocked_request("header_count_exceeded");
1299 return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
1300 }
1301
1302 if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
1304 let total_header_size: usize = req_header
1305 .headers
1306 .iter()
1307 .map(|(k, v)| k.as_str().len() + v.len())
1308 .sum();
1309
1310 if total_header_size > config.limits.max_header_size_bytes {
1311 warn!(
1312 correlation_id = %ctx.trace_id,
1313 header_size = total_header_size,
1314 limit = config.limits.max_header_size_bytes,
1315 "Request blocked: exceeds header size limit"
1316 );
1317
1318 self.metrics.record_blocked_request("header_size_exceeded");
1319 return Err(Error::explain(
1320 ErrorType::InternalError,
1321 "Headers too large",
1322 ));
1323 }
1324 }
1325
1326 trace!(
1328 correlation_id = %ctx.trace_id,
1329 "Processing request through agents"
1330 );
1331 if let Err(e) = self
1332 .process_agents(session, ctx, &client_addr, client_port)
1333 .await
1334 {
1335 if let ErrorType::HTTPStatus(status) = e.etype() {
1338 let error_msg = e.to_string();
1340 let body = error_msg
1341 .split("context:")
1342 .nth(1)
1343 .map(|s| s.trim())
1344 .unwrap_or("Request blocked");
1345 debug!(
1346 correlation_id = %ctx.trace_id,
1347 status = status,
1348 body = %body,
1349 "Sending HTTP error response for agent block"
1350 );
1351 crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
1352 return Ok(true); }
1354 return Err(e);
1356 }
1357
1358 trace!(
1359 correlation_id = %ctx.trace_id,
1360 "Request filter phase complete, forwarding to upstream"
1361 );
1362
1363 Ok(false) }
1365
1366 async fn request_body_filter(
1373 &self,
1374 _session: &mut Session,
1375 body: &mut Option<Bytes>,
1376 end_of_stream: bool,
1377 ctx: &mut Self::CTX,
1378 ) -> Result<(), Box<Error>> {
1379 use sentinel_config::BodyStreamingMode;
1380
1381 if ctx.is_websocket_upgrade {
1383 if let Some(ref handler) = ctx.websocket_handler {
1384 let result = handler.process_client_data(body.take()).await;
1385 match result {
1386 crate::websocket::ProcessResult::Forward(data) => {
1387 *body = data;
1388 }
1389 crate::websocket::ProcessResult::Close(reason) => {
1390 warn!(
1391 correlation_id = %ctx.trace_id,
1392 code = reason.code,
1393 reason = %reason.reason,
1394 "WebSocket connection closed by agent (client->server)"
1395 );
1396 return Err(Error::explain(
1398 ErrorType::InternalError,
1399 format!("WebSocket closed: {} {}", reason.code, reason.reason),
1400 ));
1401 }
1402 }
1403 }
1404 return Ok(());
1406 }
1407
1408 let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
1410 if chunk_len > 0 {
1411 ctx.request_body_bytes += chunk_len as u64;
1412
1413 trace!(
1414 correlation_id = %ctx.trace_id,
1415 chunk_size = chunk_len,
1416 total_body_bytes = ctx.request_body_bytes,
1417 end_of_stream = end_of_stream,
1418 streaming_mode = ?ctx.request_body_streaming_mode,
1419 "Processing request body chunk"
1420 );
1421
1422 let config = ctx
1424 .config
1425 .get_or_insert_with(|| self.config_manager.current());
1426 if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
1427 warn!(
1428 correlation_id = %ctx.trace_id,
1429 body_bytes = ctx.request_body_bytes,
1430 limit = config.limits.max_body_size_bytes,
1431 "Request body size limit exceeded"
1432 );
1433 self.metrics.record_blocked_request("body_size_exceeded");
1434 return Err(Error::explain(
1435 ErrorType::InternalError,
1436 "Request body too large",
1437 ));
1438 }
1439 }
1440
1441 if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
1443 let config = ctx
1444 .config
1445 .get_or_insert_with(|| self.config_manager.current());
1446 let max_inspection_bytes = config
1447 .waf
1448 .as_ref()
1449 .map(|w| w.body_inspection.max_inspection_bytes as u64)
1450 .unwrap_or(1024 * 1024);
1451
1452 match ctx.request_body_streaming_mode {
1453 BodyStreamingMode::Stream => {
1454 if body.is_some() {
1456 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1457 .await?;
1458 } else if end_of_stream && ctx.agent_needs_more {
1459 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1461 .await?;
1462 }
1463 }
1464 BodyStreamingMode::Hybrid { buffer_threshold } => {
1465 if ctx.body_bytes_inspected < buffer_threshold as u64 {
1467 if let Some(ref chunk) = body {
1469 let bytes_to_buffer = std::cmp::min(
1470 chunk.len(),
1471 (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
1472 );
1473 ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
1474 ctx.body_bytes_inspected += bytes_to_buffer as u64;
1475
1476 if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
1478 {
1479 self.send_buffered_body_to_agents(
1481 end_of_stream && chunk.len() == bytes_to_buffer,
1482 ctx,
1483 )
1484 .await?;
1485 ctx.body_buffer.clear();
1486
1487 if bytes_to_buffer < chunk.len() {
1489 let remaining = chunk.slice(bytes_to_buffer..);
1490 let mut remaining_body = Some(remaining);
1491 self.process_body_chunk_streaming(
1492 &mut remaining_body,
1493 end_of_stream,
1494 ctx,
1495 )
1496 .await?;
1497 }
1498 }
1499 }
1500 } else {
1501 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1503 .await?;
1504 }
1505 }
1506 BodyStreamingMode::Buffer => {
1507 if let Some(ref chunk) = body {
1509 if ctx.body_bytes_inspected < max_inspection_bytes {
1510 let bytes_to_inspect = std::cmp::min(
1511 chunk.len() as u64,
1512 max_inspection_bytes - ctx.body_bytes_inspected,
1513 ) as usize;
1514
1515 ctx.body_buffer
1516 .extend_from_slice(&chunk[..bytes_to_inspect]);
1517 ctx.body_bytes_inspected += bytes_to_inspect as u64;
1518
1519 trace!(
1520 correlation_id = %ctx.trace_id,
1521 bytes_inspected = ctx.body_bytes_inspected,
1522 max_inspection_bytes = max_inspection_bytes,
1523 buffer_size = ctx.body_buffer.len(),
1524 "Buffering body for agent inspection"
1525 );
1526 }
1527 }
1528
1529 let should_send =
1531 end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1532 if should_send && !ctx.body_buffer.is_empty() {
1533 self.send_buffered_body_to_agents(end_of_stream, ctx)
1534 .await?;
1535 ctx.body_buffer.clear();
1536 }
1537 }
1538 }
1539 }
1540
1541 if end_of_stream {
1542 trace!(
1543 correlation_id = %ctx.trace_id,
1544 total_body_bytes = ctx.request_body_bytes,
1545 bytes_inspected = ctx.body_bytes_inspected,
1546 "Request body complete"
1547 );
1548 }
1549
1550 Ok(())
1551 }
1552
1553 async fn response_filter(
1554 &self,
1555 _session: &mut Session,
1556 upstream_response: &mut ResponseHeader,
1557 ctx: &mut Self::CTX,
1558 ) -> Result<(), Box<Error>> {
1559 let status = upstream_response.status.as_u16();
1560 let duration = ctx.elapsed();
1561
1562 trace!(
1563 correlation_id = %ctx.trace_id,
1564 status = status,
1565 "Starting response filter phase"
1566 );
1567
1568 if status == 101 && ctx.is_websocket_upgrade {
1570 if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1571 let inspector = crate::websocket::WebSocketInspector::with_metrics(
1573 self.agent_manager.clone(),
1574 ctx.route_id
1575 .clone()
1576 .unwrap_or_else(|| "unknown".to_string()),
1577 ctx.trace_id.clone(),
1578 ctx.client_ip.clone(),
1579 100, Some(self.metrics.clone()),
1581 );
1582
1583 let handler = crate::websocket::WebSocketHandler::new(
1584 std::sync::Arc::new(inspector),
1585 1024 * 1024, );
1587
1588 ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1589
1590 info!(
1591 correlation_id = %ctx.trace_id,
1592 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1593 agent_count = ctx.websocket_inspection_agents.len(),
1594 "WebSocket upgrade successful, frame inspection enabled"
1595 );
1596 } else if ctx.websocket_skip_inspection {
1597 debug!(
1598 correlation_id = %ctx.trace_id,
1599 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1600 "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1601 );
1602 } else {
1603 debug!(
1604 correlation_id = %ctx.trace_id,
1605 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1606 "WebSocket upgrade successful"
1607 );
1608 }
1609 }
1610
1611 trace!(
1613 correlation_id = %ctx.trace_id,
1614 "Applying security headers"
1615 );
1616 self.apply_security_headers(upstream_response).ok();
1617
1618 upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1620
1621 if let Some(ref rate_info) = ctx.rate_limit_info {
1623 upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1624 upstream_response
1625 .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1626 upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1627 }
1628
1629 if ctx.inference_budget_enabled {
1631 if let Some(remaining) = ctx.inference_budget_remaining {
1632 upstream_response.insert_header("X-Budget-Remaining", remaining.to_string())?;
1633 }
1634 if let Some(period_reset) = ctx.inference_budget_period_reset {
1635 let reset_datetime = chrono::DateTime::from_timestamp(period_reset as i64, 0)
1637 .map(|dt| dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1638 .unwrap_or_else(|| period_reset.to_string());
1639 upstream_response.insert_header("X-Budget-Period-Reset", reset_datetime)?;
1640 }
1641 }
1642
1643 if let Some(ref country_code) = ctx.geo_country_code {
1645 upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1646 }
1647
1648 if ctx.guardrail_warning {
1650 upstream_response.insert_header("X-Guardrail-Warning", "prompt-injection-detected")?;
1651 }
1652
1653 if ctx.used_fallback() {
1655 upstream_response.insert_header("X-Fallback-Used", "true")?;
1656
1657 if let Some(ref upstream) = ctx.upstream {
1658 upstream_response.insert_header("X-Fallback-Upstream", upstream)?;
1659 }
1660
1661 if let Some(ref reason) = ctx.fallback_reason {
1662 upstream_response.insert_header("X-Fallback-Reason", reason.to_string())?;
1663 }
1664
1665 if let Some(ref original) = ctx.original_upstream {
1666 upstream_response.insert_header("X-Original-Upstream", original)?;
1667 }
1668
1669 if let Some(ref mapping) = ctx.model_mapping_applied {
1670 upstream_response.insert_header(
1671 "X-Model-Mapping",
1672 format!("{} -> {}", mapping.0, mapping.1),
1673 )?;
1674 }
1675
1676 trace!(
1677 correlation_id = %ctx.trace_id,
1678 fallback_attempt = ctx.fallback_attempt,
1679 fallback_upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1680 original_upstream = ctx.original_upstream.as_deref().unwrap_or("unknown"),
1681 "Added fallback response headers"
1682 );
1683
1684 if status < 400 {
1686 if let Some(metrics) = get_fallback_metrics() {
1687 metrics.record_fallback_success(
1688 ctx.route_id.as_deref().unwrap_or("unknown"),
1689 ctx.upstream.as_deref().unwrap_or("unknown"),
1690 );
1691 }
1692 }
1693 }
1694
1695 if ctx.inference_rate_limit_enabled {
1697 let content_type = upstream_response
1699 .headers
1700 .get("content-type")
1701 .and_then(|ct| ct.to_str().ok());
1702
1703 if is_sse_response(content_type) {
1704 let provider = ctx
1706 .route_config
1707 .as_ref()
1708 .and_then(|r| r.inference.as_ref())
1709 .map(|i| i.provider.clone())
1710 .unwrap_or_default();
1711
1712 ctx.inference_streaming_response = true;
1713 ctx.inference_streaming_counter = Some(StreamingTokenCounter::new(
1714 provider,
1715 ctx.inference_model.clone(),
1716 ));
1717
1718 trace!(
1719 correlation_id = %ctx.trace_id,
1720 content_type = ?content_type,
1721 model = ?ctx.inference_model,
1722 "Initialized streaming token counter for SSE response"
1723 );
1724 }
1725 }
1726
1727 if status >= 400 {
1729 trace!(
1730 correlation_id = %ctx.trace_id,
1731 status = status,
1732 "Handling error response"
1733 );
1734 self.handle_error_response(upstream_response, ctx).await?;
1735 }
1736
1737 self.metrics.record_request(
1739 ctx.route_id.as_deref().unwrap_or("unknown"),
1740 &ctx.method,
1741 status,
1742 duration,
1743 );
1744
1745 if let Some(ref mut span) = ctx.otel_span {
1747 span.set_status(status);
1748 if let Some(ref upstream) = ctx.upstream {
1749 span.set_upstream(upstream, "");
1750 }
1751 if status >= 500 {
1752 span.record_error(&format!("HTTP {}", status));
1753 }
1754 }
1755
1756 if let Some(ref upstream) = ctx.upstream {
1758 let success = status < 500;
1759
1760 trace!(
1761 correlation_id = %ctx.trace_id,
1762 upstream = %upstream,
1763 success = success,
1764 status = status,
1765 "Recording passive health check result"
1766 );
1767
1768 let error_msg = if !success {
1769 Some(format!("HTTP {}", status))
1770 } else {
1771 None
1772 };
1773 self.passive_health
1774 .record_outcome(upstream, success, error_msg.as_deref())
1775 .await;
1776
1777 if let Some(pool) = self.upstream_pools.get(upstream).await {
1779 pool.report_result(upstream, success).await;
1780 }
1781
1782 if !success {
1783 warn!(
1784 correlation_id = %ctx.trace_id,
1785 upstream = %upstream,
1786 status = status,
1787 "Upstream returned error status"
1788 );
1789 }
1790 }
1791
1792 if status >= 500 {
1794 error!(
1795 correlation_id = %ctx.trace_id,
1796 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1797 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1798 method = %ctx.method,
1799 path = %ctx.path,
1800 status = status,
1801 duration_ms = duration.as_millis(),
1802 attempts = ctx.upstream_attempts,
1803 "Request completed with server error"
1804 );
1805 } else if status >= 400 {
1806 warn!(
1807 correlation_id = %ctx.trace_id,
1808 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1809 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1810 method = %ctx.method,
1811 path = %ctx.path,
1812 status = status,
1813 duration_ms = duration.as_millis(),
1814 "Request completed with client error"
1815 );
1816 } else {
1817 debug!(
1818 correlation_id = %ctx.trace_id,
1819 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1820 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1821 method = %ctx.method,
1822 path = %ctx.path,
1823 status = status,
1824 duration_ms = duration.as_millis(),
1825 attempts = ctx.upstream_attempts,
1826 "Request completed"
1827 );
1828 }
1829
1830 Ok(())
1831 }
1832
1833 async fn upstream_request_filter(
1836 &self,
1837 _session: &mut Session,
1838 upstream_request: &mut pingora::http::RequestHeader,
1839 ctx: &mut Self::CTX,
1840 ) -> Result<()>
1841 where
1842 Self::CTX: Send + Sync,
1843 {
1844 trace!(
1845 correlation_id = %ctx.trace_id,
1846 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1847 "Applying upstream request modifications"
1848 );
1849
1850 upstream_request
1852 .insert_header("X-Trace-Id", &ctx.trace_id)
1853 .ok();
1854
1855 if let Some(ref span) = ctx.otel_span {
1857 let sampled = ctx.trace_context.as_ref().map(|c| c.sampled).unwrap_or(true);
1858 let traceparent =
1859 crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled);
1860 upstream_request
1861 .insert_header(crate::otel::TRACEPARENT_HEADER, &traceparent)
1862 .ok();
1863 }
1864
1865 upstream_request
1867 .insert_header("X-Forwarded-By", "Sentinel")
1868 .ok();
1869
1870 if let Some(ref route_config) = ctx.route_config {
1873 let mods = route_config.policies.request_headers.clone();
1874
1875 for (name, value) in mods.set {
1877 upstream_request.insert_header(name, value).ok();
1878 }
1879
1880 for (name, value) in mods.add {
1882 upstream_request.append_header(name, value).ok();
1883 }
1884
1885 for name in &mods.remove {
1887 upstream_request.remove_header(name);
1888 }
1889
1890 trace!(
1891 correlation_id = %ctx.trace_id,
1892 "Applied request header modifications"
1893 );
1894 }
1895
1896 upstream_request.remove_header("X-Internal-Token");
1898 upstream_request.remove_header("Authorization-Internal");
1899
1900 if let Some(ref route_config) = ctx.route_config {
1903 if let Some(ref shadow_config) = route_config.shadow {
1904 let pools_snapshot = self.upstream_pools.snapshot().await;
1906 let upstream_pools = std::sync::Arc::new(pools_snapshot);
1907
1908 let route_id = ctx.route_id.clone().unwrap_or_else(|| "unknown".to_string());
1910
1911 let shadow_manager = crate::shadow::ShadowManager::new(
1913 upstream_pools,
1914 shadow_config.clone(),
1915 Some(std::sync::Arc::clone(&self.metrics)),
1916 route_id,
1917 );
1918
1919 if shadow_manager.should_shadow(upstream_request) {
1921 trace!(
1922 correlation_id = %ctx.trace_id,
1923 shadow_upstream = %shadow_config.upstream,
1924 percentage = shadow_config.percentage,
1925 "Shadowing request"
1926 );
1927
1928 let shadow_headers = upstream_request.clone();
1930
1931 let shadow_ctx = crate::upstream::RequestContext {
1933 client_ip: ctx.client_ip.parse().ok(),
1934 headers: std::collections::HashMap::new(), path: ctx.path.clone(),
1936 method: ctx.method.clone(),
1937 };
1938
1939 let buffer_body = shadow_config.buffer_body
1941 && crate::shadow::should_buffer_method(&ctx.method);
1942
1943 if buffer_body {
1944 trace!(
1948 correlation_id = %ctx.trace_id,
1949 "Deferring shadow request until body is buffered"
1950 );
1951 ctx.shadow_pending = Some(crate::proxy::context::ShadowPendingRequest {
1952 headers: shadow_headers,
1953 manager: std::sync::Arc::new(shadow_manager),
1954 request_ctx: shadow_ctx,
1955 include_body: true,
1956 });
1957 if !ctx.body_inspection_enabled {
1960 ctx.body_inspection_enabled = true;
1961 }
1964 } else {
1965 shadow_manager.shadow_request(shadow_headers, None, shadow_ctx);
1967 ctx.shadow_sent = true;
1968 }
1969 }
1970 }
1971 }
1972
1973 Ok(())
1974 }
1975
1976 fn response_body_filter(
1982 &self,
1983 _session: &mut Session,
1984 body: &mut Option<Bytes>,
1985 end_of_stream: bool,
1986 ctx: &mut Self::CTX,
1987 ) -> Result<Option<Duration>, Box<Error>> {
1988 if ctx.is_websocket_upgrade {
1991 if let Some(ref handler) = ctx.websocket_handler {
1992 let handler = handler.clone();
1993 let data = body.take();
1994
1995 let result = tokio::task::block_in_place(|| {
1998 tokio::runtime::Handle::current()
1999 .block_on(async { handler.process_server_data(data).await })
2000 });
2001
2002 match result {
2003 crate::websocket::ProcessResult::Forward(data) => {
2004 *body = data;
2005 }
2006 crate::websocket::ProcessResult::Close(reason) => {
2007 warn!(
2008 correlation_id = %ctx.trace_id,
2009 code = reason.code,
2010 reason = %reason.reason,
2011 "WebSocket connection closed by agent (server->client)"
2012 );
2013 let close_frame =
2016 crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
2017 let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
2018 if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
2019 *body = Some(Bytes::from(encoded));
2020 }
2021 }
2022 }
2023 }
2024 return Ok(None);
2026 }
2027
2028 if let Some(ref chunk) = body {
2030 ctx.response_bytes += chunk.len() as u64;
2031
2032 trace!(
2033 correlation_id = %ctx.trace_id,
2034 chunk_size = chunk.len(),
2035 total_response_bytes = ctx.response_bytes,
2036 end_of_stream = end_of_stream,
2037 "Processing response body chunk"
2038 );
2039
2040 if let Some(ref mut counter) = ctx.inference_streaming_counter {
2042 let result = counter.process_chunk(chunk);
2043
2044 if result.content.is_some() || result.is_done {
2045 trace!(
2046 correlation_id = %ctx.trace_id,
2047 has_content = result.content.is_some(),
2048 is_done = result.is_done,
2049 chunks_processed = counter.chunks_processed(),
2050 accumulated_content_len = counter.content().len(),
2051 "Processed SSE chunk for token counting"
2052 );
2053 }
2054 }
2055
2056 if ctx.response_body_inspection_enabled
2060 && !ctx.response_body_inspection_agents.is_empty()
2061 {
2062 let config = ctx
2063 .config
2064 .get_or_insert_with(|| self.config_manager.current());
2065 let max_inspection_bytes = config
2066 .waf
2067 .as_ref()
2068 .map(|w| w.body_inspection.max_inspection_bytes as u64)
2069 .unwrap_or(1024 * 1024);
2070
2071 if ctx.response_body_bytes_inspected < max_inspection_bytes {
2072 let bytes_to_inspect = std::cmp::min(
2073 chunk.len() as u64,
2074 max_inspection_bytes - ctx.response_body_bytes_inspected,
2075 ) as usize;
2076
2077 ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
2081 ctx.response_body_chunk_index += 1;
2082
2083 trace!(
2084 correlation_id = %ctx.trace_id,
2085 bytes_inspected = ctx.response_body_bytes_inspected,
2086 max_inspection_bytes = max_inspection_bytes,
2087 chunk_index = ctx.response_body_chunk_index,
2088 "Tracking response body for inspection"
2089 );
2090 }
2091 }
2092 }
2093
2094 if end_of_stream {
2095 trace!(
2096 correlation_id = %ctx.trace_id,
2097 total_response_bytes = ctx.response_bytes,
2098 response_bytes_inspected = ctx.response_body_bytes_inspected,
2099 "Response body complete"
2100 );
2101 }
2102
2103 Ok(None)
2105 }
2106
2107 async fn connected_to_upstream(
2110 &self,
2111 _session: &mut Session,
2112 reused: bool,
2113 peer: &HttpPeer,
2114 #[cfg(unix)] _fd: RawFd,
2115 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
2116 digest: Option<&Digest>,
2117 ctx: &mut Self::CTX,
2118 ) -> Result<(), Box<Error>> {
2119 ctx.connection_reused = reused;
2121
2122 if reused {
2124 trace!(
2125 correlation_id = %ctx.trace_id,
2126 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2127 peer_address = %peer.address(),
2128 "Reusing existing upstream connection"
2129 );
2130 } else {
2131 debug!(
2132 correlation_id = %ctx.trace_id,
2133 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2134 peer_address = %peer.address(),
2135 ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
2136 "Established new upstream connection"
2137 );
2138 }
2139
2140 Ok(())
2141 }
2142
2143 fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
2153 let route_id = match ctx.route_id.as_deref() {
2155 Some(id) => id,
2156 None => {
2157 trace!(
2158 correlation_id = %ctx.trace_id,
2159 "Cache filter: no route ID, skipping cache"
2160 );
2161 return Ok(());
2162 }
2163 };
2164
2165 if !self.cache_manager.is_enabled(route_id) {
2167 trace!(
2168 correlation_id = %ctx.trace_id,
2169 route_id = %route_id,
2170 "Cache disabled for route"
2171 );
2172 return Ok(());
2173 }
2174
2175 if !self
2177 .cache_manager
2178 .is_method_cacheable(route_id, &ctx.method)
2179 {
2180 trace!(
2181 correlation_id = %ctx.trace_id,
2182 route_id = %route_id,
2183 method = %ctx.method,
2184 "Method not cacheable"
2185 );
2186 return Ok(());
2187 }
2188
2189 debug!(
2191 correlation_id = %ctx.trace_id,
2192 route_id = %route_id,
2193 method = %ctx.method,
2194 path = %ctx.path,
2195 "Enabling HTTP caching for request"
2196 );
2197
2198 let storage = get_cache_storage();
2200 let eviction = get_cache_eviction();
2201 let cache_lock = get_cache_lock();
2202
2203 session.cache.enable(
2205 storage,
2206 Some(eviction),
2207 None, Some(cache_lock),
2209 None, );
2211
2212 ctx.cache_eligible = true;
2214
2215 trace!(
2216 correlation_id = %ctx.trace_id,
2217 route_id = %route_id,
2218 cache_enabled = session.cache.enabled(),
2219 "Cache enabled for request"
2220 );
2221
2222 Ok(())
2223 }
2224
2225 fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
2230 let req_header = session.req_header();
2231 let method = req_header.method.as_str();
2232 let path = req_header.uri.path();
2233 let host = ctx.host.as_deref().unwrap_or("unknown");
2234 let query = req_header.uri.query();
2235
2236 let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2238
2239 trace!(
2240 correlation_id = %ctx.trace_id,
2241 cache_key = %key_string,
2242 "Generated cache key"
2243 );
2244
2245 Ok(CacheKey::default(req_header))
2248 }
2249
2250 fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
2255 session.cache.cache_miss();
2257
2258 if let Some(route_id) = ctx.route_id.as_deref() {
2260 self.cache_manager.stats().record_miss();
2261
2262 trace!(
2263 correlation_id = %ctx.trace_id,
2264 route_id = %route_id,
2265 path = %ctx.path,
2266 "Cache miss"
2267 );
2268 }
2269 }
2270
2271 async fn cache_hit_filter(
2277 &self,
2278 session: &mut Session,
2279 meta: &CacheMeta,
2280 _hit_handler: &mut HitHandler,
2281 is_fresh: bool,
2282 ctx: &mut Self::CTX,
2283 ) -> Result<Option<ForcedInvalidationKind>>
2284 where
2285 Self::CTX: Send + Sync,
2286 {
2287 let req_header = session.req_header();
2289 let method = req_header.method.as_str();
2290 let path = req_header.uri.path();
2291 let host = req_header.uri.host().unwrap_or("localhost");
2292 let query = req_header.uri.query();
2293
2294 let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2296
2297 if self.cache_manager.should_invalidate(&cache_key) {
2299 info!(
2300 correlation_id = %ctx.trace_id,
2301 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2302 cache_key = %cache_key,
2303 "Cache entry invalidated by purge request"
2304 );
2305 return Ok(Some(ForcedInvalidationKind::ForceExpired));
2307 }
2308
2309 if is_fresh {
2311 self.cache_manager.stats().record_hit();
2312
2313 debug!(
2314 correlation_id = %ctx.trace_id,
2315 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2316 is_fresh = is_fresh,
2317 "Cache hit (fresh)"
2318 );
2319 } else {
2320 trace!(
2321 correlation_id = %ctx.trace_id,
2322 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2323 is_fresh = is_fresh,
2324 "Cache hit (stale)"
2325 );
2326 }
2327
2328 Ok(None)
2330 }
2331
2332 fn response_cache_filter(
2337 &self,
2338 _session: &Session,
2339 resp: &ResponseHeader,
2340 ctx: &mut Self::CTX,
2341 ) -> Result<RespCacheable> {
2342 let route_id = match ctx.route_id.as_deref() {
2343 Some(id) => id,
2344 None => {
2345 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2346 "no_route",
2347 )));
2348 }
2349 };
2350
2351 if !self.cache_manager.is_enabled(route_id) {
2353 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2354 "disabled",
2355 )));
2356 }
2357
2358 let status = resp.status.as_u16();
2359
2360 if !self.cache_manager.is_status_cacheable(route_id, status) {
2362 trace!(
2363 correlation_id = %ctx.trace_id,
2364 route_id = %route_id,
2365 status = status,
2366 "Status code not cacheable"
2367 );
2368 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2369 }
2370
2371 if let Some(cache_control) = resp.headers.get("cache-control") {
2373 if let Ok(cc_str) = cache_control.to_str() {
2374 if crate::cache::CacheManager::is_no_cache(cc_str) {
2375 trace!(
2376 correlation_id = %ctx.trace_id,
2377 route_id = %route_id,
2378 cache_control = %cc_str,
2379 "Response has no-cache directive"
2380 );
2381 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2382 }
2383 }
2384 }
2385
2386 let cache_control = resp
2388 .headers
2389 .get("cache-control")
2390 .and_then(|v| v.to_str().ok());
2391 let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
2392
2393 if ttl.is_zero() {
2394 trace!(
2395 correlation_id = %ctx.trace_id,
2396 route_id = %route_id,
2397 "TTL is zero, not caching"
2398 );
2399 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2400 }
2401
2402 let config = self
2404 .cache_manager
2405 .get_route_config(route_id)
2406 .unwrap_or_default();
2407
2408 let now = std::time::SystemTime::now();
2410 let fresh_until = now + ttl;
2411
2412 let header = resp.clone();
2414
2415 let cache_meta = CacheMeta::new(
2417 fresh_until,
2418 now,
2419 config.stale_while_revalidate_secs as u32,
2420 config.stale_if_error_secs as u32,
2421 header,
2422 );
2423
2424 self.cache_manager.stats().record_store();
2426
2427 debug!(
2428 correlation_id = %ctx.trace_id,
2429 route_id = %route_id,
2430 status = status,
2431 ttl_secs = ttl.as_secs(),
2432 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2433 stale_if_error_secs = config.stale_if_error_secs,
2434 "Caching response"
2435 );
2436
2437 Ok(RespCacheable::Cacheable(cache_meta))
2438 }
2439
2440 fn should_serve_stale(
2444 &self,
2445 _session: &mut Session,
2446 ctx: &mut Self::CTX,
2447 error: Option<&Error>,
2448 ) -> bool {
2449 let route_id = match ctx.route_id.as_deref() {
2450 Some(id) => id,
2451 None => return false,
2452 };
2453
2454 let config = match self.cache_manager.get_route_config(route_id) {
2456 Some(c) => c,
2457 None => return false,
2458 };
2459
2460 if let Some(e) = error {
2462 if e.esource() == &pingora::ErrorSource::Upstream {
2464 debug!(
2465 correlation_id = %ctx.trace_id,
2466 route_id = %route_id,
2467 error = %e,
2468 stale_if_error_secs = config.stale_if_error_secs,
2469 "Considering stale-if-error"
2470 );
2471 return config.stale_if_error_secs > 0;
2472 }
2473 }
2474
2475 if error.is_none() && config.stale_while_revalidate_secs > 0 {
2477 trace!(
2478 correlation_id = %ctx.trace_id,
2479 route_id = %route_id,
2480 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2481 "Allowing stale-while-revalidate"
2482 );
2483 return true;
2484 }
2485
2486 false
2487 }
2488
2489 fn range_header_filter(
2499 &self,
2500 session: &mut Session,
2501 response: &mut ResponseHeader,
2502 ctx: &mut Self::CTX,
2503 ) -> pingora_proxy::RangeType
2504 where
2505 Self::CTX: Send + Sync,
2506 {
2507 let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
2509 matches!(
2511 config.service_type,
2512 sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
2513 )
2514 });
2515
2516 if !supports_range {
2517 trace!(
2518 correlation_id = %ctx.trace_id,
2519 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2520 "Range request not supported for this route type"
2521 );
2522 return pingora_proxy::RangeType::None;
2523 }
2524
2525 let range_type = pingora_proxy::range_header_filter(session.req_header(), response);
2527
2528 match &range_type {
2529 pingora_proxy::RangeType::None => {
2530 trace!(
2531 correlation_id = %ctx.trace_id,
2532 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2533 "No range request or not applicable"
2534 );
2535 }
2536 pingora_proxy::RangeType::Single(range) => {
2537 trace!(
2538 correlation_id = %ctx.trace_id,
2539 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2540 range_start = range.start,
2541 range_end = range.end,
2542 "Processing single-range request"
2543 );
2544 }
2545 pingora_proxy::RangeType::Multi(multi) => {
2546 trace!(
2547 correlation_id = %ctx.trace_id,
2548 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2549 range_count = multi.ranges.len(),
2550 "Processing multi-range request"
2551 );
2552 }
2553 pingora_proxy::RangeType::Invalid => {
2554 debug!(
2555 correlation_id = %ctx.trace_id,
2556 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2557 "Invalid range header"
2558 );
2559 }
2560 }
2561
2562 range_type
2563 }
2564
2565 async fn fail_to_proxy(
2568 &self,
2569 session: &mut Session,
2570 e: &Error,
2571 ctx: &mut Self::CTX,
2572 ) -> pingora_proxy::FailToProxy
2573 where
2574 Self::CTX: Send + Sync,
2575 {
2576 let error_code = match e.etype() {
2577 ErrorType::ConnectRefused => 503,
2579 ErrorType::ConnectTimedout => 504,
2580 ErrorType::ConnectNoRoute => 502,
2581
2582 ErrorType::ReadTimedout => 504,
2584 ErrorType::WriteTimedout => 504,
2585
2586 ErrorType::TLSHandshakeFailure => 502,
2588 ErrorType::InvalidCert => 502,
2589
2590 ErrorType::InvalidHTTPHeader => 400,
2592 ErrorType::H2Error => 502,
2593
2594 ErrorType::ConnectProxyFailure => 502,
2596 ErrorType::ConnectionClosed => 502,
2597
2598 ErrorType::HTTPStatus(status) => *status,
2600
2601 ErrorType::InternalError => {
2603 let error_str = e.to_string();
2605 if error_str.contains("upstream")
2606 || error_str.contains("DNS")
2607 || error_str.contains("resolve")
2608 {
2609 502
2610 } else {
2611 500
2612 }
2613 }
2614
2615 _ => 502,
2617 };
2618
2619 error!(
2620 correlation_id = %ctx.trace_id,
2621 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2622 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2623 error_type = ?e.etype(),
2624 error = %e,
2625 error_code = error_code,
2626 "Proxy error occurred"
2627 );
2628
2629 self.metrics
2631 .record_blocked_request(&format!("proxy_error_{}", error_code));
2632
2633 let error_message = match error_code {
2637 400 => "Bad Request",
2638 502 => "Bad Gateway",
2639 503 => "Service Unavailable",
2640 504 => "Gateway Timeout",
2641 _ => "Internal Server Error",
2642 };
2643
2644 let body = format!(
2646 r#"{{"error":"{} {}","trace_id":"{}"}}"#,
2647 error_code, error_message, ctx.trace_id
2648 );
2649
2650 let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
2652 header
2653 .insert_header("Content-Type", "application/json")
2654 .ok();
2655 header
2656 .insert_header("Content-Length", body.len().to_string())
2657 .ok();
2658 header
2659 .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
2660 .ok();
2661 header.insert_header("Connection", "close").ok();
2662
2663 if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
2665 warn!(
2666 correlation_id = %ctx.trace_id,
2667 error = %write_err,
2668 "Failed to write error response header"
2669 );
2670 } else {
2671 if let Err(write_err) = session
2673 .write_response_body(Some(bytes::Bytes::from(body)), true)
2674 .await
2675 {
2676 warn!(
2677 correlation_id = %ctx.trace_id,
2678 error = %write_err,
2679 "Failed to write error response body"
2680 );
2681 }
2682 }
2683
2684 pingora_proxy::FailToProxy {
2687 error_code,
2688 can_reuse_downstream: false,
2689 }
2690 }
2691
2692 fn error_while_proxy(
2698 &self,
2699 peer: &HttpPeer,
2700 session: &mut Session,
2701 e: Box<Error>,
2702 ctx: &mut Self::CTX,
2703 client_reused: bool,
2704 ) -> Box<Error> {
2705 let error_type = e.etype().clone();
2706 let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
2707
2708 let is_retryable = matches!(
2710 error_type,
2711 ErrorType::ConnectTimedout
2712 | ErrorType::ReadTimedout
2713 | ErrorType::WriteTimedout
2714 | ErrorType::ConnectionClosed
2715 | ErrorType::ConnectRefused
2716 );
2717
2718 warn!(
2720 correlation_id = %ctx.trace_id,
2721 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2722 upstream = %upstream_id,
2723 peer_address = %peer.address(),
2724 error_type = ?error_type,
2725 error = %e,
2726 client_reused = client_reused,
2727 is_retryable = is_retryable,
2728 "Error during proxy operation"
2729 );
2730
2731 let peer_address = peer.address().to_string();
2734 let upstream_pools = self.upstream_pools.clone();
2735 let upstream_id_owned = upstream_id.to_string();
2736 tokio::spawn(async move {
2737 if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
2738 pool.report_result(&peer_address, false).await;
2739 }
2740 });
2741
2742 self.metrics
2744 .record_blocked_request(&format!("proxy_error_{:?}", error_type));
2745
2746 let mut enhanced_error = e.more_context(format!(
2748 "Upstream: {}, Peer: {}, Attempts: {}",
2749 upstream_id,
2750 peer.address(),
2751 ctx.upstream_attempts
2752 ));
2753
2754 if is_retryable {
2759 let can_retry = if client_reused {
2760 !session.as_ref().retry_buffer_truncated()
2762 } else {
2763 true
2765 };
2766
2767 enhanced_error.retry.decide_reuse(can_retry);
2768
2769 if can_retry {
2770 debug!(
2771 correlation_id = %ctx.trace_id,
2772 upstream = %upstream_id,
2773 error_type = ?error_type,
2774 "Error is retryable, will attempt retry"
2775 );
2776 }
2777 } else {
2778 enhanced_error.retry.decide_reuse(false);
2780 }
2781
2782 enhanced_error
2783 }
2784
2785 async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
2786 self.reload_coordinator.dec_requests();
2788
2789 if !ctx.shadow_sent {
2791 if let Some(shadow_pending) = ctx.shadow_pending.take() {
2792 let body = if shadow_pending.include_body && !ctx.body_buffer.is_empty() {
2793 Some(ctx.body_buffer.clone())
2795 } else {
2796 None
2797 };
2798
2799 trace!(
2800 correlation_id = %ctx.trace_id,
2801 body_size = body.as_ref().map(|b| b.len()).unwrap_or(0),
2802 "Firing deferred shadow request with buffered body"
2803 );
2804
2805 shadow_pending.manager.shadow_request(
2806 shadow_pending.headers,
2807 body,
2808 shadow_pending.request_ctx,
2809 );
2810 ctx.shadow_sent = true;
2811 }
2812 }
2813
2814 let duration = ctx.elapsed();
2815
2816 let status = session
2818 .response_written()
2819 .map(|r| r.status.as_u16())
2820 .unwrap_or(0);
2821
2822 if let (Some(ref peer_addr), Some(ref upstream_id)) =
2825 (&ctx.selected_upstream_address, &ctx.upstream)
2826 {
2827 let success = status > 0 && status < 500;
2829
2830 if let Some(pool) = self.upstream_pools.get(upstream_id).await {
2831 pool.report_result_with_latency(peer_addr, success, Some(duration))
2832 .await;
2833 trace!(
2834 correlation_id = %ctx.trace_id,
2835 upstream = %upstream_id,
2836 peer_address = %peer_addr,
2837 success = success,
2838 duration_ms = duration.as_millis(),
2839 status = status,
2840 "Reported result to adaptive load balancer"
2841 );
2842 }
2843
2844 if ctx.inference_rate_limit_enabled && success {
2846 let cold_detected = self.warmth_tracker.record_request(peer_addr, duration);
2847 if cold_detected {
2848 debug!(
2849 correlation_id = %ctx.trace_id,
2850 upstream = %upstream_id,
2851 peer_address = %peer_addr,
2852 duration_ms = duration.as_millis(),
2853 "Cold model detected on inference upstream"
2854 );
2855 }
2856 }
2857 }
2858
2859 if ctx.inference_rate_limit_enabled {
2862 if let (Some(route_id), Some(ref rate_limit_key)) =
2863 (ctx.route_id.as_deref(), &ctx.inference_rate_limit_key)
2864 {
2865 let response_headers = session
2867 .response_written()
2868 .map(|r| &r.headers)
2869 .cloned()
2870 .unwrap_or_default();
2871
2872 let streaming_result = if ctx.inference_streaming_response {
2874 ctx.inference_streaming_counter
2875 .as_ref()
2876 .map(|counter| counter.finalize())
2877 } else {
2878 None
2879 };
2880
2881 if let Some(ref result) = streaming_result {
2883 debug!(
2884 correlation_id = %ctx.trace_id,
2885 output_tokens = result.output_tokens,
2886 input_tokens = ?result.input_tokens,
2887 source = ?result.source,
2888 content_length = result.content_length,
2889 "Finalized streaming token count"
2890 );
2891 }
2892
2893 if ctx.inference_streaming_response {
2895 if let Some(ref route_config) = ctx.route_config {
2896 if let Some(ref inference) = route_config.inference {
2897 if let Some(ref guardrails) = inference.guardrails {
2898 if let Some(ref pii_config) = guardrails.pii_detection {
2899 if pii_config.enabled {
2900 if let Some(ref counter) = ctx.inference_streaming_counter {
2902 let response_content = counter.content();
2903 if !response_content.is_empty() {
2904 let pii_result = self
2905 .guardrail_processor
2906 .check_pii(
2907 pii_config,
2908 response_content,
2909 ctx.route_id.as_deref(),
2910 &ctx.trace_id,
2911 )
2912 .await;
2913
2914 match pii_result {
2915 crate::inference::PiiCheckResult::Detected {
2916 detections,
2917 redacted_content: _,
2918 } => {
2919 warn!(
2920 correlation_id = %ctx.trace_id,
2921 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2922 detection_count = detections.len(),
2923 "PII detected in inference response"
2924 );
2925
2926 ctx.pii_detection_categories = detections
2928 .iter()
2929 .map(|d| d.category.clone())
2930 .collect();
2931
2932 for detection in &detections {
2934 self.metrics.record_pii_detected(
2935 ctx.route_id.as_deref().unwrap_or("unknown"),
2936 &detection.category,
2937 );
2938 }
2939 }
2940 crate::inference::PiiCheckResult::Clean => {
2941 trace!(
2942 correlation_id = %ctx.trace_id,
2943 "No PII detected in response"
2944 );
2945 }
2946 crate::inference::PiiCheckResult::Error { message } => {
2947 debug!(
2948 correlation_id = %ctx.trace_id,
2949 error = %message,
2950 "PII detection check failed"
2951 );
2952 }
2953 }
2954 }
2955 }
2956 }
2957 }
2958 }
2959 }
2960 }
2961 }
2962
2963 let empty_body: &[u8] = &[];
2967
2968 if let Some(actual_estimate) = self.inference_rate_limit_manager.record_actual(
2969 route_id,
2970 rate_limit_key,
2971 &response_headers,
2972 empty_body,
2973 ctx.inference_estimated_tokens,
2974 ) {
2975 let (actual_tokens, source_info) = if let Some(ref streaming) = streaming_result {
2977 if streaming.total_tokens.is_some() {
2979 (streaming.total_tokens.unwrap(), "streaming_api")
2980 } else if actual_estimate.source == crate::inference::TokenSource::Estimated {
2981 let total = ctx.inference_input_tokens + streaming.output_tokens;
2984 (total, "streaming_tiktoken")
2985 } else {
2986 (actual_estimate.tokens, "headers")
2987 }
2988 } else {
2989 (actual_estimate.tokens, "headers")
2990 };
2991
2992 ctx.inference_actual_tokens = Some(actual_tokens);
2993
2994 debug!(
2995 correlation_id = %ctx.trace_id,
2996 route_id = route_id,
2997 estimated_tokens = ctx.inference_estimated_tokens,
2998 actual_tokens = actual_tokens,
2999 source = source_info,
3000 streaming_response = ctx.inference_streaming_response,
3001 model = ?ctx.inference_model,
3002 "Recorded actual inference tokens"
3003 );
3004
3005 if ctx.inference_budget_enabled {
3007 let alerts = self.inference_rate_limit_manager.record_budget(
3008 route_id,
3009 rate_limit_key,
3010 actual_tokens,
3011 );
3012
3013 for alert in alerts.iter() {
3015 warn!(
3016 correlation_id = %ctx.trace_id,
3017 route_id = route_id,
3018 tenant = %alert.tenant,
3019 threshold_pct = alert.threshold * 100.0,
3020 tokens_used = alert.tokens_used,
3021 tokens_limit = alert.tokens_limit,
3022 "Token budget alert threshold crossed"
3023 );
3024 }
3025
3026 if let Some(status) = self.inference_rate_limit_manager.budget_status(
3028 route_id,
3029 rate_limit_key,
3030 ) {
3031 ctx.inference_budget_remaining = Some(status.tokens_remaining as i64);
3032 }
3033 }
3034
3035 if ctx.inference_cost_enabled {
3037 if let Some(model) = ctx.inference_model.as_deref() {
3038 let (input_tokens, output_tokens) = if let Some(ref streaming) = streaming_result {
3040 let input = streaming.input_tokens.unwrap_or(ctx.inference_input_tokens);
3042 let output = streaming.output_tokens;
3043 (input, output)
3044 } else {
3045 let input = ctx.inference_input_tokens;
3047 let output = actual_tokens.saturating_sub(input);
3048 (input, output)
3049 };
3050 ctx.inference_output_tokens = output_tokens;
3051
3052 if let Some(cost_result) = self.inference_rate_limit_manager.calculate_cost(
3053 route_id,
3054 model,
3055 input_tokens,
3056 output_tokens,
3057 ) {
3058 ctx.inference_request_cost = Some(cost_result.total_cost);
3059
3060 trace!(
3061 correlation_id = %ctx.trace_id,
3062 route_id = route_id,
3063 model = model,
3064 input_tokens = input_tokens,
3065 output_tokens = output_tokens,
3066 total_cost = cost_result.total_cost,
3067 currency = %cost_result.currency,
3068 "Calculated inference request cost"
3069 );
3070 }
3071 }
3072 }
3073 }
3074 }
3075 }
3076
3077 if self.log_manager.access_log_enabled() {
3079 let access_entry = AccessLogEntry {
3080 timestamp: chrono::Utc::now().to_rfc3339(),
3081 trace_id: ctx.trace_id.clone(),
3082 method: ctx.method.clone(),
3083 path: ctx.path.clone(),
3084 query: ctx.query.clone(),
3085 protocol: "HTTP/1.1".to_string(),
3086 status,
3087 body_bytes: ctx.response_bytes,
3088 duration_ms: duration.as_millis() as u64,
3089 client_ip: ctx.client_ip.clone(),
3090 user_agent: ctx.user_agent.clone(),
3091 referer: ctx.referer.clone(),
3092 host: ctx.host.clone(),
3093 route_id: ctx.route_id.clone(),
3094 upstream: ctx.upstream.clone(),
3095 upstream_attempts: ctx.upstream_attempts,
3096 instance_id: self.app_state.instance_id.clone(),
3097 namespace: ctx.namespace.clone(),
3098 service: ctx.service.clone(),
3099 body_bytes_sent: ctx.response_bytes,
3101 upstream_addr: ctx.selected_upstream_address.clone(),
3102 connection_reused: ctx.connection_reused,
3103 rate_limit_hit: status == 429,
3104 geo_country: ctx.geo_country_code.clone(),
3105 };
3106 self.log_manager.log_access(&access_entry);
3107 }
3108
3109 if tracing::enabled!(tracing::Level::DEBUG) {
3111 debug!(
3112 trace_id = %ctx.trace_id,
3113 method = %ctx.method,
3114 path = %ctx.path,
3115 route_id = ?ctx.route_id,
3116 upstream = ?ctx.upstream,
3117 status = status,
3118 duration_ms = duration.as_millis() as u64,
3119 upstream_attempts = ctx.upstream_attempts,
3120 error = ?_error.map(|e| e.to_string()),
3121 "Request completed"
3122 );
3123 }
3124
3125 if ctx.is_websocket_upgrade && status == 101 {
3127 info!(
3128 trace_id = %ctx.trace_id,
3129 route_id = ?ctx.route_id,
3130 upstream = ?ctx.upstream,
3131 client_ip = %ctx.client_ip,
3132 "WebSocket connection established"
3133 );
3134 }
3135
3136 if let Some(span) = ctx.otel_span.take() {
3138 span.end();
3139 }
3140 }
3141}
3142
3143impl SentinelProxy {
3148 async fn process_body_chunk_streaming(
3150 &self,
3151 body: &mut Option<Bytes>,
3152 end_of_stream: bool,
3153 ctx: &mut RequestContext,
3154 ) -> Result<(), Box<Error>> {
3155 let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
3157 let chunk_index = ctx.request_body_chunk_index;
3158 ctx.request_body_chunk_index += 1;
3159 ctx.body_bytes_inspected += chunk_data.len() as u64;
3160
3161 debug!(
3162 correlation_id = %ctx.trace_id,
3163 chunk_index = chunk_index,
3164 chunk_size = chunk_data.len(),
3165 end_of_stream = end_of_stream,
3166 "Streaming body chunk to agents"
3167 );
3168
3169 let agent_ctx = crate::agents::AgentCallContext {
3171 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3172 metadata: sentinel_agent_protocol::RequestMetadata {
3173 correlation_id: ctx.trace_id.clone(),
3174 request_id: ctx.trace_id.clone(),
3175 client_ip: ctx.client_ip.clone(),
3176 client_port: 0,
3177 server_name: ctx.host.clone(),
3178 protocol: "HTTP/1.1".to_string(),
3179 tls_version: None,
3180 tls_cipher: None,
3181 route_id: ctx.route_id.clone(),
3182 upstream_id: ctx.upstream.clone(),
3183 timestamp: chrono::Utc::now().to_rfc3339(),
3184 traceparent: ctx.traceparent(),
3185 },
3186 route_id: ctx.route_id.clone(),
3187 upstream_id: ctx.upstream.clone(),
3188 request_body: None, response_body: None,
3190 };
3191
3192 let agent_ids = ctx.body_inspection_agents.clone();
3193 let total_size = None; match self
3196 .agent_manager
3197 .process_request_body_streaming(
3198 &agent_ctx,
3199 &chunk_data,
3200 end_of_stream,
3201 chunk_index,
3202 ctx.body_bytes_inspected as usize,
3203 total_size,
3204 &agent_ids,
3205 )
3206 .await
3207 {
3208 Ok(decision) => {
3209 ctx.agent_needs_more = decision.needs_more;
3211
3212 if let Some(ref mutation) = decision.request_body_mutation {
3214 if !mutation.is_pass_through() {
3215 if mutation.is_drop() {
3216 *body = None;
3218 trace!(
3219 correlation_id = %ctx.trace_id,
3220 chunk_index = chunk_index,
3221 "Agent dropped body chunk"
3222 );
3223 } else if let Some(ref new_data) = mutation.data {
3224 *body = Some(Bytes::from(new_data.clone()));
3226 trace!(
3227 correlation_id = %ctx.trace_id,
3228 chunk_index = chunk_index,
3229 original_size = chunk_data.len(),
3230 new_size = new_data.len(),
3231 "Agent mutated body chunk"
3232 );
3233 }
3234 }
3235 }
3236
3237 if !decision.needs_more && !decision.is_allow() {
3239 warn!(
3240 correlation_id = %ctx.trace_id,
3241 action = ?decision.action,
3242 "Agent blocked request body"
3243 );
3244 self.metrics.record_blocked_request("agent_body_inspection");
3245
3246 let (status, message) = match &decision.action {
3247 crate::agents::AgentAction::Block { status, body, .. } => (
3248 *status,
3249 body.clone().unwrap_or_else(|| "Blocked".to_string()),
3250 ),
3251 _ => (403, "Forbidden".to_string()),
3252 };
3253
3254 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3255 }
3256
3257 trace!(
3258 correlation_id = %ctx.trace_id,
3259 needs_more = decision.needs_more,
3260 "Agent processed body chunk"
3261 );
3262 }
3263 Err(e) => {
3264 let fail_closed = ctx
3265 .route_config
3266 .as_ref()
3267 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3268 .unwrap_or(false);
3269
3270 if fail_closed {
3271 error!(
3272 correlation_id = %ctx.trace_id,
3273 error = %e,
3274 "Agent streaming body inspection failed, blocking (fail-closed)"
3275 );
3276 return Err(Error::explain(
3277 ErrorType::HTTPStatus(503),
3278 "Service unavailable",
3279 ));
3280 } else {
3281 warn!(
3282 correlation_id = %ctx.trace_id,
3283 error = %e,
3284 "Agent streaming body inspection failed, allowing (fail-open)"
3285 );
3286 }
3287 }
3288 }
3289
3290 Ok(())
3291 }
3292
3293 async fn send_buffered_body_to_agents(
3295 &self,
3296 end_of_stream: bool,
3297 ctx: &mut RequestContext,
3298 ) -> Result<(), Box<Error>> {
3299 debug!(
3300 correlation_id = %ctx.trace_id,
3301 buffer_size = ctx.body_buffer.len(),
3302 end_of_stream = end_of_stream,
3303 agent_count = ctx.body_inspection_agents.len(),
3304 decompression_enabled = ctx.decompression_enabled,
3305 "Sending buffered body to agents for inspection"
3306 );
3307
3308 let body_for_inspection = if ctx.decompression_enabled {
3310 if let Some(ref encoding) = ctx.body_content_encoding {
3311 let config = crate::decompression::DecompressionConfig {
3312 max_ratio: ctx.max_decompression_ratio,
3313 max_output_bytes: ctx.max_decompression_bytes,
3314 };
3315
3316 match crate::decompression::decompress_body(
3317 &ctx.body_buffer,
3318 encoding,
3319 &config,
3320 ) {
3321 Ok(result) => {
3322 ctx.body_was_decompressed = true;
3323 self.metrics
3324 .record_decompression_success(encoding, result.ratio);
3325 debug!(
3326 correlation_id = %ctx.trace_id,
3327 encoding = %encoding,
3328 compressed_size = result.compressed_size,
3329 decompressed_size = result.decompressed_size,
3330 ratio = result.ratio,
3331 "Body decompressed for agent inspection"
3332 );
3333 result.data
3334 }
3335 Err(e) => {
3336 let failure_reason = match &e {
3338 crate::decompression::DecompressionError::RatioExceeded { .. } => {
3339 "ratio_exceeded"
3340 }
3341 crate::decompression::DecompressionError::SizeExceeded { .. } => {
3342 "size_exceeded"
3343 }
3344 crate::decompression::DecompressionError::InvalidData { .. } => {
3345 "invalid_data"
3346 }
3347 crate::decompression::DecompressionError::UnsupportedEncoding { .. } => {
3348 "unsupported"
3349 }
3350 crate::decompression::DecompressionError::IoError(_) => "io_error",
3351 };
3352 self.metrics
3353 .record_decompression_failure(encoding, failure_reason);
3354
3355 let fail_closed = ctx
3357 .route_config
3358 .as_ref()
3359 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3360 .unwrap_or(false);
3361
3362 if fail_closed {
3363 error!(
3364 correlation_id = %ctx.trace_id,
3365 error = %e,
3366 encoding = %encoding,
3367 "Decompression failed, blocking (fail-closed)"
3368 );
3369 return Err(Error::explain(
3370 ErrorType::HTTPStatus(400),
3371 "Invalid compressed body",
3372 ));
3373 } else {
3374 warn!(
3375 correlation_id = %ctx.trace_id,
3376 error = %e,
3377 encoding = %encoding,
3378 "Decompression failed, sending compressed body (fail-open)"
3379 );
3380 ctx.body_buffer.clone()
3381 }
3382 }
3383 }
3384 } else {
3385 ctx.body_buffer.clone()
3386 }
3387 } else {
3388 ctx.body_buffer.clone()
3389 };
3390
3391 let agent_ctx = crate::agents::AgentCallContext {
3392 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3393 metadata: sentinel_agent_protocol::RequestMetadata {
3394 correlation_id: ctx.trace_id.clone(),
3395 request_id: ctx.trace_id.clone(),
3396 client_ip: ctx.client_ip.clone(),
3397 client_port: 0,
3398 server_name: ctx.host.clone(),
3399 protocol: "HTTP/1.1".to_string(),
3400 tls_version: None,
3401 tls_cipher: None,
3402 route_id: ctx.route_id.clone(),
3403 upstream_id: ctx.upstream.clone(),
3404 timestamp: chrono::Utc::now().to_rfc3339(),
3405 traceparent: ctx.traceparent(),
3406 },
3407 route_id: ctx.route_id.clone(),
3408 upstream_id: ctx.upstream.clone(),
3409 request_body: Some(body_for_inspection.clone()),
3410 response_body: None,
3411 };
3412
3413 let agent_ids = ctx.body_inspection_agents.clone();
3414 match self
3415 .agent_manager
3416 .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
3417 .await
3418 {
3419 Ok(decision) => {
3420 if !decision.is_allow() {
3421 warn!(
3422 correlation_id = %ctx.trace_id,
3423 action = ?decision.action,
3424 "Agent blocked request body"
3425 );
3426 self.metrics.record_blocked_request("agent_body_inspection");
3427
3428 let (status, message) = match &decision.action {
3429 crate::agents::AgentAction::Block { status, body, .. } => (
3430 *status,
3431 body.clone().unwrap_or_else(|| "Blocked".to_string()),
3432 ),
3433 _ => (403, "Forbidden".to_string()),
3434 };
3435
3436 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3437 }
3438
3439 trace!(
3440 correlation_id = %ctx.trace_id,
3441 "Agent allowed request body"
3442 );
3443 }
3444 Err(e) => {
3445 let fail_closed = ctx
3446 .route_config
3447 .as_ref()
3448 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3449 .unwrap_or(false);
3450
3451 if fail_closed {
3452 error!(
3453 correlation_id = %ctx.trace_id,
3454 error = %e,
3455 "Agent body inspection failed, blocking (fail-closed)"
3456 );
3457 return Err(Error::explain(
3458 ErrorType::HTTPStatus(503),
3459 "Service unavailable",
3460 ));
3461 } else {
3462 warn!(
3463 correlation_id = %ctx.trace_id,
3464 error = %e,
3465 "Agent body inspection failed, allowing (fail-open)"
3466 );
3467 }
3468 }
3469 }
3470
3471 Ok(())
3472 }
3473}