1use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14 CacheKey, CacheMeta, ForcedInvalidationKind, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::inference::{
23 extract_inference_content, is_sse_response, PromptInjectionResult, StreamingTokenCounter,
24};
25use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
26use crate::rate_limit::HeaderAccessor;
27use crate::routing::RequestInfo;
28
29use super::context::{FallbackReason, RequestContext};
30use super::fallback::FallbackEvaluator;
31use super::fallback_metrics::get_fallback_metrics;
32use super::model_routing;
33use super::model_routing_metrics::get_model_routing_metrics;
34use super::SentinelProxy;
35
36struct NoHeaderAccessor;
38impl HeaderAccessor for NoHeaderAccessor {
39 fn get_header(&self, _name: &str) -> Option<String> {
40 None
41 }
42}
43
44#[async_trait]
45impl ProxyHttp for SentinelProxy {
46 type CTX = RequestContext;
47
48 fn new_ctx(&self) -> Self::CTX {
49 RequestContext::new()
50 }
51
52 fn fail_to_connect(
53 &self,
54 _session: &mut Session,
55 peer: &HttpPeer,
56 ctx: &mut Self::CTX,
57 e: Box<Error>,
58 ) -> Box<Error> {
59 error!(
60 correlation_id = %ctx.trace_id,
61 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
62 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
63 peer_address = %peer.address(),
64 error = %e,
65 "Failed to connect to upstream peer"
66 );
67 e
69 }
70
71 async fn early_request_filter(
74 &self,
75 session: &mut Session,
76 ctx: &mut Self::CTX,
77 ) -> Result<(), Box<Error>> {
78 let req_header = session.req_header();
80 let method = req_header.method.as_str();
81 let path = req_header.uri.path();
82 let host = req_header
83 .headers
84 .get("host")
85 .and_then(|h| h.to_str().ok())
86 .unwrap_or("");
87
88 if let Some(ref challenge_manager) = self.acme_challenges {
90 if let Some(token) = crate::acme::ChallengeManager::extract_token(path) {
91 if let Some(key_authorization) = challenge_manager.get_response(token) {
92 debug!(
93 token = %token,
94 "Serving ACME HTTP-01 challenge response"
95 );
96
97 let mut resp = ResponseHeader::build(200, None)?;
99 resp.insert_header("Content-Type", "text/plain")?;
100 resp.insert_header("Content-Length", key_authorization.len().to_string())?;
101
102 session
104 .write_response_header(Box::new(resp), false)
105 .await?;
106 session
107 .write_response_body(Some(Bytes::from(key_authorization)), true)
108 .await?;
109
110 return Err(Error::explain(
112 ErrorType::InternalError,
113 "ACME challenge served",
114 ));
115 } else {
116 warn!(
118 token = %token,
119 "ACME challenge token not found"
120 );
121 }
122 }
123 }
124
125 ctx.method = method.to_string();
126 ctx.path = path.to_string();
127 ctx.host = Some(host.to_string());
128
129 let route_match = {
131 let route_matcher = self.route_matcher.read();
132 let request_info = RequestInfo::new(method, path, host);
133 match route_matcher.match_request(&request_info) {
134 Some(m) => m,
135 None => return Ok(()), }
137 };
138
139 ctx.trace_id = self.get_trace_id(session);
140 ctx.route_id = Some(route_match.route_id.to_string());
141 ctx.route_config = Some(route_match.config.clone());
142
143 if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
145 if let Ok(s) = traceparent.to_str() {
146 ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
147 }
148 }
149
150 if let Some(tracer) = crate::otel::get_tracer() {
152 ctx.otel_span = Some(tracer.start_span(method, path, ctx.trace_context.as_ref()));
153 }
154
155 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
157 trace!(
158 correlation_id = %ctx.trace_id,
159 route_id = %route_match.route_id,
160 builtin_handler = ?route_match.config.builtin_handler,
161 "Handling builtin route in early_request_filter"
162 );
163
164 let handled = self
166 .handle_builtin_route(session, ctx, &route_match)
167 .await?;
168
169 if handled {
170 return Err(Error::explain(
172 ErrorType::InternalError,
173 "Builtin handler complete",
174 ));
175 }
176 }
177
178 Ok(())
179 }
180
181 async fn upstream_peer(
182 &self,
183 session: &mut Session,
184 ctx: &mut Self::CTX,
185 ) -> Result<Box<HttpPeer>, Box<Error>> {
186 self.reload_coordinator.inc_requests();
188
189 if ctx.config.is_none() {
191 ctx.config = Some(self.config_manager.current());
192 }
193
194 if ctx.client_ip.is_empty() {
196 ctx.client_ip = session
197 .client_addr()
198 .map(|a| a.to_string())
199 .unwrap_or_else(|| "unknown".to_string());
200 }
201
202 let req_header = session.req_header();
203
204 if ctx.method.is_empty() {
206 ctx.method = req_header.method.to_string();
207 ctx.path = req_header.uri.path().to_string();
208 ctx.query = req_header.uri.query().map(|q| q.to_string());
209 ctx.host = req_header
210 .headers
211 .get("host")
212 .and_then(|v| v.to_str().ok())
213 .map(|s| s.to_string());
214 }
215 ctx.user_agent = req_header
216 .headers
217 .get("user-agent")
218 .and_then(|v| v.to_str().ok())
219 .map(|s| s.to_string());
220 ctx.referer = req_header
221 .headers
222 .get("referer")
223 .and_then(|v| v.to_str().ok())
224 .map(|s| s.to_string());
225
226 trace!(
227 correlation_id = %ctx.trace_id,
228 client_ip = %ctx.client_ip,
229 "Request received, initializing context"
230 );
231
232 let route_match = if let Some(ref route_config) = ctx.route_config {
234 let route_id = ctx.route_id.as_deref().unwrap_or("");
235 crate::routing::RouteMatch {
236 route_id: sentinel_common::RouteId::new(route_id),
237 config: route_config.clone(),
238 }
239 } else {
240 let (match_result, route_duration) = {
242 let route_matcher = self.route_matcher.read();
243 let host = ctx.host.as_deref().unwrap_or("");
244
245 let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
247
248 if route_matcher.needs_headers() {
250 request_info = request_info
251 .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
252 }
253
254 if route_matcher.needs_query_params() {
256 request_info =
257 request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
258 }
259
260 trace!(
261 correlation_id = %ctx.trace_id,
262 method = %request_info.method,
263 path = %request_info.path,
264 host = %request_info.host,
265 "Built request info for route matching"
266 );
267
268 let route_start = std::time::Instant::now();
269 let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
270 warn!(
271 correlation_id = %ctx.trace_id,
272 method = %request_info.method,
273 path = %request_info.path,
274 host = %request_info.host,
275 "No matching route found for request"
276 );
277 Error::explain(ErrorType::InternalError, "No matching route found")
278 })?;
279 let route_duration = route_start.elapsed();
280 (route_match, route_duration)
282 };
283
284 ctx.route_id = Some(match_result.route_id.to_string());
285 ctx.route_config = Some(match_result.config.clone());
286
287 if ctx.trace_id.is_empty() {
289 ctx.trace_id = self.get_trace_id(session);
290
291 if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER)
293 {
294 if let Ok(s) = traceparent.to_str() {
295 ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
296 }
297 }
298
299 if let Some(tracer) = crate::otel::get_tracer() {
301 ctx.otel_span = Some(tracer.start_span(
302 &ctx.method,
303 &ctx.path,
304 ctx.trace_context.as_ref(),
305 ));
306 }
307 }
308
309 trace!(
310 correlation_id = %ctx.trace_id,
311 route_id = %match_result.route_id,
312 route_duration_us = route_duration.as_micros(),
313 service_type = ?match_result.config.service_type,
314 "Route matched"
315 );
316 match_result
317 };
318
319 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
321 trace!(
322 correlation_id = %ctx.trace_id,
323 route_id = %route_match.route_id,
324 builtin_handler = ?route_match.config.builtin_handler,
325 "Route type is builtin, skipping upstream"
326 );
327 ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
329 return Err(Error::explain(
331 ErrorType::InternalError,
332 "Builtin handler handled in request_filter",
333 ));
334 }
335
336 if route_match.config.service_type == sentinel_config::ServiceType::Static {
338 trace!(
339 correlation_id = %ctx.trace_id,
340 route_id = %route_match.route_id,
341 "Route type is static, checking for static server"
342 );
343 if self
345 .static_servers
346 .get(route_match.route_id.as_str())
347 .await
348 .is_some()
349 {
350 ctx.upstream = Some(format!("_static_{}", route_match.route_id));
352 info!(
353 correlation_id = %ctx.trace_id,
354 route_id = %route_match.route_id,
355 path = %ctx.path,
356 "Serving static file"
357 );
358 return Err(Error::explain(
360 ErrorType::InternalError,
361 "Static file serving handled in request_filter",
362 ));
363 }
364 }
365
366 let mut model_routing_applied = false;
369 if let Some(ref inference) = route_match.config.inference {
370 if let Some(ref model_routing) = inference.model_routing {
371 let model = model_routing::extract_model_from_headers(&req_header.headers);
373
374 if let Some(ref model_name) = model {
375 if let Some(routing_result) =
377 model_routing::find_upstream_for_model(model_routing, model_name)
378 {
379 debug!(
380 correlation_id = %ctx.trace_id,
381 route_id = %route_match.route_id,
382 model = %model_name,
383 upstream = %routing_result.upstream,
384 is_default = routing_result.is_default,
385 provider_override = ?routing_result.provider,
386 "Model-based routing selected upstream"
387 );
388
389 ctx.record_model_routing(
390 &routing_result.upstream,
391 Some(model_name.clone()),
392 routing_result.provider,
393 );
394 model_routing_applied = true;
395
396 if let Some(metrics) = get_model_routing_metrics() {
398 metrics.record_model_routed(
399 route_match.route_id.as_str(),
400 model_name,
401 &routing_result.upstream,
402 );
403 if routing_result.is_default {
404 metrics.record_default_upstream(route_match.route_id.as_str());
405 }
406 if let Some(provider) = routing_result.provider {
407 metrics.record_provider_override(
408 route_match.route_id.as_str(),
409 &routing_result.upstream,
410 provider.as_str(),
411 );
412 }
413 }
414 }
415 } else if let Some(ref default_upstream) = model_routing.default_upstream {
416 debug!(
418 correlation_id = %ctx.trace_id,
419 route_id = %route_match.route_id,
420 upstream = %default_upstream,
421 "Model-based routing using default upstream (no model header)"
422 );
423 ctx.record_model_routing(default_upstream, None, None);
424 model_routing_applied = true;
425
426 if let Some(metrics) = get_model_routing_metrics() {
428 metrics.record_no_model_header(route_match.route_id.as_str());
429 }
430 }
431 }
432 }
433
434 if !model_routing_applied {
436 if let Some(ref upstream) = route_match.config.upstream {
437 ctx.upstream = Some(upstream.clone());
438 trace!(
439 correlation_id = %ctx.trace_id,
440 route_id = %route_match.route_id,
441 upstream = %upstream,
442 "Upstream configured for route"
443 );
444 } else {
445 error!(
446 correlation_id = %ctx.trace_id,
447 route_id = %route_match.route_id,
448 "Route has no upstream configured"
449 );
450 return Err(Error::explain(
451 ErrorType::InternalError,
452 format!(
453 "Route '{}' has no upstream configured",
454 route_match.route_id
455 ),
456 ));
457 }
458 }
459
460 if let Some(ref fallback_config) = route_match.config.fallback {
463 let upstream_name = ctx.upstream.as_ref().unwrap();
464
465 let is_healthy = if let Some(pool) = self.upstream_pools.get(upstream_name).await {
467 pool.has_healthy_targets().await
468 } else {
469 false };
471
472 let is_budget_exhausted = ctx.inference_budget_exhausted;
474
475 let current_model = ctx.inference_model.as_deref();
477
478 let evaluator = FallbackEvaluator::new(
480 fallback_config,
481 ctx.tried_upstreams(),
482 ctx.fallback_attempt,
483 );
484
485 if let Some(decision) = evaluator.should_fallback_before_request(
487 upstream_name,
488 is_healthy,
489 is_budget_exhausted,
490 current_model,
491 ) {
492 info!(
493 correlation_id = %ctx.trace_id,
494 route_id = %route_match.route_id,
495 from_upstream = %upstream_name,
496 to_upstream = %decision.next_upstream,
497 reason = %decision.reason,
498 fallback_attempt = ctx.fallback_attempt + 1,
499 "Triggering fallback routing"
500 );
501
502 if let Some(metrics) = get_fallback_metrics() {
504 metrics.record_fallback_attempt(
505 route_match.route_id.as_str(),
506 upstream_name,
507 &decision.next_upstream,
508 &decision.reason,
509 );
510 }
511
512 ctx.record_fallback(decision.reason, &decision.next_upstream);
514
515 if let Some((original, mapped)) = decision.model_mapping {
517 if let Some(metrics) = get_fallback_metrics() {
519 metrics.record_model_mapping(
520 route_match.route_id.as_str(),
521 &original,
522 &mapped,
523 );
524 }
525
526 ctx.record_model_mapping(original, mapped);
527 trace!(
528 correlation_id = %ctx.trace_id,
529 original_model = ?ctx.model_mapping_applied().map(|m| &m.0),
530 mapped_model = ?ctx.model_mapping_applied().map(|m| &m.1),
531 "Applied model mapping for fallback"
532 );
533 }
534 }
535 }
536
537 debug!(
538 correlation_id = %ctx.trace_id,
539 route_id = %route_match.route_id,
540 upstream = ?ctx.upstream,
541 method = %req_header.method,
542 path = %req_header.uri.path(),
543 host = ctx.host.as_deref().unwrap_or("-"),
544 client_ip = %ctx.client_ip,
545 "Processing request"
546 );
547
548 if ctx
550 .upstream
551 .as_ref()
552 .is_some_and(|u| u.starts_with("_static_"))
553 {
554 return Err(Error::explain(
556 ErrorType::InternalError,
557 "Static route should be handled in request_filter",
558 ));
559 }
560
561 let upstream_name = ctx
562 .upstream
563 .as_ref()
564 .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
565
566 trace!(
567 correlation_id = %ctx.trace_id,
568 upstream = %upstream_name,
569 "Looking up upstream pool"
570 );
571
572 let pool = self
573 .upstream_pools
574 .get(upstream_name)
575 .await
576 .ok_or_else(|| {
577 error!(
578 correlation_id = %ctx.trace_id,
579 upstream = %upstream_name,
580 "Upstream pool not found"
581 );
582 Error::explain(
583 ErrorType::InternalError,
584 format!("Upstream pool '{}' not found", upstream_name),
585 )
586 })?;
587
588 let max_retries = route_match
590 .config
591 .retry_policy
592 .as_ref()
593 .map(|r| r.max_attempts)
594 .unwrap_or(1);
595
596 trace!(
597 correlation_id = %ctx.trace_id,
598 upstream = %upstream_name,
599 max_retries = max_retries,
600 "Starting upstream peer selection"
601 );
602
603 let mut last_error = None;
604 let selection_start = std::time::Instant::now();
605
606 for attempt in 1..=max_retries {
607 ctx.upstream_attempts = attempt;
608
609 trace!(
610 correlation_id = %ctx.trace_id,
611 upstream = %upstream_name,
612 attempt = attempt,
613 max_retries = max_retries,
614 "Attempting to select upstream peer"
615 );
616
617 match pool.select_peer_with_metadata(None).await {
618 Ok((peer, metadata)) => {
619 let selection_duration = selection_start.elapsed();
620 let peer_addr = peer.address().to_string();
622 ctx.selected_upstream_address = Some(peer_addr.clone());
623
624 if metadata.contains_key("sticky_session_new") {
626 ctx.sticky_session_new_assignment = true;
627 ctx.sticky_session_set_cookie =
628 metadata.get("sticky_set_cookie_header").cloned();
629 ctx.sticky_target_index = metadata
630 .get("sticky_target_index")
631 .and_then(|s| s.parse().ok());
632
633 trace!(
634 correlation_id = %ctx.trace_id,
635 sticky_target_index = ?ctx.sticky_target_index,
636 "New sticky session assignment, will set cookie"
637 );
638 }
639
640 debug!(
641 correlation_id = %ctx.trace_id,
642 upstream = %upstream_name,
643 peer_address = %peer_addr,
644 attempt = attempt,
645 selection_duration_us = selection_duration.as_micros(),
646 sticky_session_hit = metadata.contains_key("sticky_session_hit"),
647 sticky_session_new = ctx.sticky_session_new_assignment,
648 "Selected upstream peer"
649 );
650 return Ok(Box::new(peer));
651 }
652 Err(e) => {
653 warn!(
654 correlation_id = %ctx.trace_id,
655 upstream = %upstream_name,
656 attempt = attempt,
657 max_retries = max_retries,
658 error = %e,
659 "Failed to select upstream peer"
660 );
661 last_error = Some(e);
662
663 if attempt < max_retries {
664 let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
666 trace!(
667 correlation_id = %ctx.trace_id,
668 backoff_ms = backoff.as_millis(),
669 "Backing off before retry"
670 );
671 sleep(backoff).await;
672 }
673 }
674 }
675 }
676
677 let selection_duration = selection_start.elapsed();
678 error!(
679 correlation_id = %ctx.trace_id,
680 upstream = %upstream_name,
681 attempts = max_retries,
682 selection_duration_ms = selection_duration.as_millis(),
683 last_error = ?last_error,
684 "All upstream selection attempts failed"
685 );
686
687 if ctx.used_fallback() {
689 if let Some(metrics) = get_fallback_metrics() {
690 metrics.record_fallback_exhausted(ctx.route_id.as_deref().unwrap_or("unknown"));
691 }
692 }
693
694 Err(Error::explain(
695 ErrorType::InternalError,
696 format!("All upstream attempts failed: {:?}", last_error),
697 ))
698 }
699
700 async fn request_filter(
701 &self,
702 session: &mut Session,
703 ctx: &mut Self::CTX,
704 ) -> Result<bool, Box<Error>> {
705 trace!(
706 correlation_id = %ctx.trace_id,
707 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
708 "Starting request filter phase"
709 );
710
711 if let Some(route_id) = ctx.route_id.as_deref() {
714 if self.rate_limit_manager.has_route_limiter(route_id) {
715 let rate_result = self.rate_limit_manager.check(
716 route_id,
717 &ctx.client_ip,
718 &ctx.path,
719 Option::<&NoHeaderAccessor>::None,
720 );
721
722 if rate_result.limit > 0 {
724 ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
725 limit: rate_result.limit,
726 remaining: rate_result.remaining,
727 reset_at: rate_result.reset_at,
728 });
729 }
730
731 if !rate_result.allowed {
732 use sentinel_config::RateLimitAction;
733
734 match rate_result.action {
735 RateLimitAction::Reject => {
736 warn!(
737 correlation_id = %ctx.trace_id,
738 route_id = route_id,
739 client_ip = %ctx.client_ip,
740 limiter = %rate_result.limiter,
741 limit = rate_result.limit,
742 remaining = rate_result.remaining,
743 "Request rate limited"
744 );
745 self.metrics.record_blocked_request("rate_limited");
746
747 let audit_entry = AuditLogEntry::rate_limited(
749 &ctx.trace_id,
750 &ctx.method,
751 &ctx.path,
752 &ctx.client_ip,
753 &rate_result.limiter,
754 )
755 .with_route_id(route_id)
756 .with_status_code(rate_result.status_code);
757 self.log_manager.log_audit(&audit_entry);
758
759 let body = rate_result
761 .message
762 .unwrap_or_else(|| "Rate limit exceeded".to_string());
763
764 let retry_after = rate_result.reset_at.saturating_sub(
766 std::time::SystemTime::now()
767 .duration_since(std::time::UNIX_EPOCH)
768 .unwrap_or_default()
769 .as_secs(),
770 );
771 crate::http_helpers::write_rate_limit_error(
772 session,
773 rate_result.status_code,
774 &body,
775 rate_result.limit,
776 rate_result.remaining,
777 rate_result.reset_at,
778 retry_after,
779 )
780 .await?;
781 return Ok(true); }
783 RateLimitAction::LogOnly => {
784 debug!(
785 correlation_id = %ctx.trace_id,
786 route_id = route_id,
787 "Rate limit exceeded (log only mode)"
788 );
789 }
791 RateLimitAction::Delay => {
792 if let Some(delay_ms) = rate_result.suggested_delay_ms {
794 let actual_delay = delay_ms.min(rate_result.max_delay_ms);
796
797 if actual_delay > 0 {
798 debug!(
799 correlation_id = %ctx.trace_id,
800 route_id = route_id,
801 suggested_delay_ms = delay_ms,
802 max_delay_ms = rate_result.max_delay_ms,
803 actual_delay_ms = actual_delay,
804 "Applying rate limit delay"
805 );
806
807 tokio::time::sleep(std::time::Duration::from_millis(
808 actual_delay,
809 ))
810 .await;
811 }
812 }
813 }
815 }
816 }
817 }
818 }
819
820 if let Some(route_id) = ctx.route_id.as_deref() {
823 if let Some(ref route_config) = ctx.route_config {
824 if route_config.service_type == sentinel_config::ServiceType::Inference
825 && self.inference_rate_limit_manager.has_route(route_id)
826 {
827 let headers = &session.req_header().headers;
830
831 let body = ctx.body_buffer.as_slice();
833
834 let rate_limit_key = &ctx.client_ip;
836
837 if let Some(check_result) = self.inference_rate_limit_manager.check(
838 route_id,
839 rate_limit_key,
840 headers,
841 body,
842 ) {
843 ctx.inference_rate_limit_enabled = true;
845 ctx.inference_estimated_tokens = check_result.estimated_tokens;
846 ctx.inference_rate_limit_key = Some(rate_limit_key.to_string());
847 ctx.inference_model = check_result.model.clone();
848
849 if !check_result.is_allowed() {
850 let retry_after_ms = check_result.retry_after_ms();
851 let retry_after_secs = retry_after_ms.div_ceil(1000);
852
853 warn!(
854 correlation_id = %ctx.trace_id,
855 route_id = route_id,
856 client_ip = %ctx.client_ip,
857 estimated_tokens = check_result.estimated_tokens,
858 model = ?check_result.model,
859 retry_after_ms = retry_after_ms,
860 "Inference rate limit exceeded (tokens)"
861 );
862 self.metrics.record_blocked_request("inference_rate_limited");
863
864 let audit_entry = AuditLogEntry::new(
866 &ctx.trace_id,
867 AuditEventType::RateLimitExceeded,
868 &ctx.method,
869 &ctx.path,
870 &ctx.client_ip,
871 )
872 .with_route_id(route_id)
873 .with_status_code(429)
874 .with_reason(format!(
875 "Token rate limit exceeded: estimated {} tokens, model={:?}",
876 check_result.estimated_tokens, check_result.model
877 ));
878 self.log_manager.log_audit(&audit_entry);
879
880 let body = "Token rate limit exceeded";
882 let reset_at = std::time::SystemTime::now()
883 .duration_since(std::time::UNIX_EPOCH)
884 .unwrap_or_default()
885 .as_secs()
886 + retry_after_secs;
887
888 crate::http_helpers::write_rate_limit_error(
890 session,
891 429,
892 body,
893 0, 0, reset_at,
896 retry_after_secs,
897 )
898 .await?;
899 return Ok(true); }
901
902 trace!(
903 correlation_id = %ctx.trace_id,
904 route_id = route_id,
905 estimated_tokens = check_result.estimated_tokens,
906 model = ?check_result.model,
907 "Inference rate limit check passed"
908 );
909
910 if self.inference_rate_limit_manager.has_budget(route_id) {
912 ctx.inference_budget_enabled = true;
913
914 if let Some(budget_result) = self.inference_rate_limit_manager.check_budget(
915 route_id,
916 rate_limit_key,
917 check_result.estimated_tokens,
918 ) {
919 if !budget_result.is_allowed() {
920 let retry_after_secs = budget_result.retry_after_secs();
921
922 warn!(
923 correlation_id = %ctx.trace_id,
924 route_id = route_id,
925 client_ip = %ctx.client_ip,
926 estimated_tokens = check_result.estimated_tokens,
927 retry_after_secs = retry_after_secs,
928 "Token budget exhausted"
929 );
930
931 ctx.inference_budget_exhausted = true;
932 self.metrics.record_blocked_request("budget_exhausted");
933
934 let audit_entry = AuditLogEntry::new(
936 &ctx.trace_id,
937 AuditEventType::RateLimitExceeded,
938 &ctx.method,
939 &ctx.path,
940 &ctx.client_ip,
941 )
942 .with_route_id(route_id)
943 .with_status_code(429)
944 .with_reason("Token budget exhausted".to_string());
945 self.log_manager.log_audit(&audit_entry);
946
947 let body = "Token budget exhausted";
949 let reset_at = std::time::SystemTime::now()
950 .duration_since(std::time::UNIX_EPOCH)
951 .unwrap_or_default()
952 .as_secs()
953 + retry_after_secs;
954
955 crate::http_helpers::write_rate_limit_error(
956 session,
957 429,
958 body,
959 0,
960 0,
961 reset_at,
962 retry_after_secs,
963 )
964 .await?;
965 return Ok(true);
966 }
967
968 let remaining = match &budget_result {
970 sentinel_common::budget::BudgetCheckResult::Allowed { remaining } => *remaining as i64,
971 sentinel_common::budget::BudgetCheckResult::Soft { remaining, .. } => *remaining,
972 _ => 0,
973 };
974 ctx.inference_budget_remaining = Some(remaining);
975
976 if let Some(status) = self.inference_rate_limit_manager.budget_status(
978 route_id,
979 rate_limit_key,
980 ) {
981 ctx.inference_budget_period_reset = Some(status.period_end);
982 }
983
984 trace!(
985 correlation_id = %ctx.trace_id,
986 route_id = route_id,
987 budget_remaining = remaining,
988 "Token budget check passed"
989 );
990 }
991 }
992
993 if self.inference_rate_limit_manager.has_cost_attribution(route_id) {
995 ctx.inference_cost_enabled = true;
996 }
997 }
998 }
999 }
1000 }
1001
1002 if let Some(ref route_config) = ctx.route_config {
1004 if let Some(ref inference) = route_config.inference {
1005 if let Some(ref guardrails) = inference.guardrails {
1006 if let Some(ref pi_config) = guardrails.prompt_injection {
1007 if pi_config.enabled && !ctx.body_buffer.is_empty() {
1008 ctx.guardrails_enabled = true;
1009
1010 if let Some(content) = extract_inference_content(&ctx.body_buffer) {
1012 let result = self
1013 .guardrail_processor
1014 .check_prompt_injection(
1015 pi_config,
1016 &content,
1017 ctx.inference_model.as_deref(),
1018 ctx.route_id.as_deref(),
1019 &ctx.trace_id,
1020 )
1021 .await;
1022
1023 match result {
1024 PromptInjectionResult::Blocked {
1025 status,
1026 message,
1027 detections,
1028 } => {
1029 warn!(
1030 correlation_id = %ctx.trace_id,
1031 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1032 detection_count = detections.len(),
1033 "Prompt injection detected, blocking request"
1034 );
1035
1036 self.metrics.record_blocked_request("prompt_injection");
1037
1038 ctx.guardrail_detection_categories = detections
1040 .iter()
1041 .map(|d| d.category.clone())
1042 .collect();
1043
1044 let audit_entry = AuditLogEntry::new(
1046 &ctx.trace_id,
1047 AuditEventType::Blocked,
1048 &ctx.method,
1049 &ctx.path,
1050 &ctx.client_ip,
1051 )
1052 .with_route_id(
1053 ctx.route_id.as_deref().unwrap_or("unknown"),
1054 )
1055 .with_status_code(status)
1056 .with_reason("Prompt injection detected".to_string());
1057 self.log_manager.log_audit(&audit_entry);
1058
1059 crate::http_helpers::write_json_error(
1061 session,
1062 status,
1063 "prompt_injection_blocked",
1064 Some(&message),
1065 )
1066 .await?;
1067 return Ok(true);
1068 }
1069 PromptInjectionResult::Detected { detections } => {
1070 warn!(
1072 correlation_id = %ctx.trace_id,
1073 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1074 detection_count = detections.len(),
1075 "Prompt injection detected (logged only)"
1076 );
1077 ctx.guardrail_detection_categories = detections
1078 .iter()
1079 .map(|d| d.category.clone())
1080 .collect();
1081 }
1082 PromptInjectionResult::Warning { detections } => {
1083 ctx.guardrail_warning = true;
1085 ctx.guardrail_detection_categories = detections
1086 .iter()
1087 .map(|d| d.category.clone())
1088 .collect();
1089 debug!(
1090 correlation_id = %ctx.trace_id,
1091 "Prompt injection warning set"
1092 );
1093 }
1094 PromptInjectionResult::Clean => {
1095 trace!(
1096 correlation_id = %ctx.trace_id,
1097 "No prompt injection detected"
1098 );
1099 }
1100 PromptInjectionResult::Error { message } => {
1101 trace!(
1103 correlation_id = %ctx.trace_id,
1104 error = %message,
1105 "Prompt injection check error (failure mode applied)"
1106 );
1107 }
1108 }
1109 }
1110 }
1111 }
1112 }
1113 }
1114 }
1115
1116 if let Some(route_id) = ctx.route_id.as_deref() {
1118 if let Some(ref route_config) = ctx.route_config {
1119 for filter_id in &route_config.filters {
1120 if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
1121 ctx.geo_country_code = result.country_code.clone();
1123 ctx.geo_lookup_performed = true;
1124
1125 if !result.allowed {
1126 warn!(
1127 correlation_id = %ctx.trace_id,
1128 route_id = route_id,
1129 client_ip = %ctx.client_ip,
1130 country = ?result.country_code,
1131 filter_id = %filter_id,
1132 "Request blocked by geo filter"
1133 );
1134 self.metrics.record_blocked_request("geo_blocked");
1135
1136 let audit_entry = AuditLogEntry::new(
1138 &ctx.trace_id,
1139 AuditEventType::Blocked,
1140 &ctx.method,
1141 &ctx.path,
1142 &ctx.client_ip,
1143 )
1144 .with_route_id(route_id)
1145 .with_status_code(result.status_code)
1146 .with_reason(format!(
1147 "Geo blocked: country={}, filter={}",
1148 result.country_code.as_deref().unwrap_or("unknown"),
1149 filter_id
1150 ));
1151 self.log_manager.log_audit(&audit_entry);
1152
1153 let body = result
1155 .block_message
1156 .unwrap_or_else(|| "Access denied".to_string());
1157
1158 crate::http_helpers::write_error(
1159 session,
1160 result.status_code,
1161 &body,
1162 "text/plain",
1163 )
1164 .await?;
1165 return Ok(true); }
1167
1168 break;
1170 }
1171 }
1172 }
1173 }
1174
1175 let is_websocket_upgrade = session
1177 .req_header()
1178 .headers
1179 .get(http::header::UPGRADE)
1180 .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
1181 .unwrap_or(false);
1182
1183 if is_websocket_upgrade {
1184 ctx.is_websocket_upgrade = true;
1185
1186 if let Some(ref route_config) = ctx.route_config {
1188 if !route_config.websocket {
1189 warn!(
1190 correlation_id = %ctx.trace_id,
1191 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1192 client_ip = %ctx.client_ip,
1193 "WebSocket upgrade rejected: not enabled for route"
1194 );
1195
1196 self.metrics.record_blocked_request("websocket_not_enabled");
1197
1198 let audit_entry = AuditLogEntry::new(
1200 &ctx.trace_id,
1201 AuditEventType::Blocked,
1202 &ctx.method,
1203 &ctx.path,
1204 &ctx.client_ip,
1205 )
1206 .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
1207 .with_action("websocket_rejected")
1208 .with_reason("WebSocket not enabled for route");
1209 self.log_manager.log_audit(&audit_entry);
1210
1211 crate::http_helpers::write_error(
1213 session,
1214 403,
1215 "WebSocket not enabled for this route",
1216 "text/plain",
1217 )
1218 .await?;
1219 return Ok(true); }
1221
1222 debug!(
1223 correlation_id = %ctx.trace_id,
1224 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1225 "WebSocket upgrade request allowed"
1226 );
1227
1228 if route_config.websocket_inspection {
1230 let has_compression = session
1232 .req_header()
1233 .headers
1234 .get("Sec-WebSocket-Extensions")
1235 .and_then(|v| v.to_str().ok())
1236 .map(|s| s.contains("permessage-deflate"))
1237 .unwrap_or(false);
1238
1239 if has_compression {
1240 debug!(
1241 correlation_id = %ctx.trace_id,
1242 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1243 "WebSocket inspection skipped: permessage-deflate negotiated"
1244 );
1245 ctx.websocket_skip_inspection = true;
1246 } else {
1247 ctx.websocket_inspection_enabled = true;
1248
1249 ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
1251 sentinel_agent_protocol::EventType::WebSocketFrame,
1252 );
1253
1254 debug!(
1255 correlation_id = %ctx.trace_id,
1256 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1257 agent_count = ctx.websocket_inspection_agents.len(),
1258 "WebSocket frame inspection enabled"
1259 );
1260 }
1261 }
1262 }
1263 }
1264
1265 if let Some(route_config) = ctx.route_config.clone() {
1268 if route_config.service_type == sentinel_config::ServiceType::Static {
1269 trace!(
1270 correlation_id = %ctx.trace_id,
1271 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1272 "Handling static file route"
1273 );
1274 let route_match = crate::routing::RouteMatch {
1276 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1277 config: route_config.clone(),
1278 };
1279 return self.handle_static_route(session, ctx, &route_match).await;
1280 } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
1281 trace!(
1282 correlation_id = %ctx.trace_id,
1283 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1284 builtin_handler = ?route_config.builtin_handler,
1285 "Handling builtin route"
1286 );
1287 let route_match = crate::routing::RouteMatch {
1289 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1290 config: route_config.clone(),
1291 };
1292 return self.handle_builtin_route(session, ctx, &route_match).await;
1293 }
1294 }
1295
1296 if let Some(route_id) = ctx.route_id.clone() {
1298 if let Some(validator) = self.validators.get(&route_id).await {
1299 trace!(
1300 correlation_id = %ctx.trace_id,
1301 route_id = %route_id,
1302 "Running API schema validation"
1303 );
1304 if let Some(result) = self
1305 .validate_api_request(session, ctx, &route_id, &validator)
1306 .await?
1307 {
1308 debug!(
1309 correlation_id = %ctx.trace_id,
1310 route_id = %route_id,
1311 validation_passed = result,
1312 "API validation complete"
1313 );
1314 return Ok(result);
1315 }
1316 }
1317 }
1318
1319 let client_addr = session
1321 .client_addr()
1322 .map(|a| format!("{}", a))
1323 .unwrap_or_else(|| "unknown".to_string());
1324 let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
1325
1326 let req_header = session.req_header_mut();
1327
1328 req_header
1330 .insert_header("X-Correlation-Id", &ctx.trace_id)
1331 .ok();
1332 req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
1333
1334 let config = ctx
1336 .config
1337 .get_or_insert_with(|| self.config_manager.current());
1338
1339 const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; let header_count = req_header.headers.len();
1344 if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
1345 && header_count > config.limits.max_header_count
1346 {
1347 warn!(
1348 correlation_id = %ctx.trace_id,
1349 header_count = header_count,
1350 limit = config.limits.max_header_count,
1351 "Request blocked: exceeds header count limit"
1352 );
1353
1354 self.metrics.record_blocked_request("header_count_exceeded");
1355 return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
1356 }
1357
1358 if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
1360 let total_header_size: usize = req_header
1361 .headers
1362 .iter()
1363 .map(|(k, v)| k.as_str().len() + v.len())
1364 .sum();
1365
1366 if total_header_size > config.limits.max_header_size_bytes {
1367 warn!(
1368 correlation_id = %ctx.trace_id,
1369 header_size = total_header_size,
1370 limit = config.limits.max_header_size_bytes,
1371 "Request blocked: exceeds header size limit"
1372 );
1373
1374 self.metrics.record_blocked_request("header_size_exceeded");
1375 return Err(Error::explain(
1376 ErrorType::InternalError,
1377 "Headers too large",
1378 ));
1379 }
1380 }
1381
1382 trace!(
1384 correlation_id = %ctx.trace_id,
1385 "Processing request through agents"
1386 );
1387 if let Err(e) = self
1388 .process_agents(session, ctx, &client_addr, client_port)
1389 .await
1390 {
1391 if let ErrorType::HTTPStatus(status) = e.etype() {
1394 let error_msg = e.to_string();
1396 let body = error_msg
1397 .split("context:")
1398 .nth(1)
1399 .map(|s| s.trim())
1400 .unwrap_or("Request blocked");
1401 debug!(
1402 correlation_id = %ctx.trace_id,
1403 status = status,
1404 body = %body,
1405 "Sending HTTP error response for agent block"
1406 );
1407 crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
1408 return Ok(true); }
1410 return Err(e);
1412 }
1413
1414 trace!(
1415 correlation_id = %ctx.trace_id,
1416 "Request filter phase complete, forwarding to upstream"
1417 );
1418
1419 Ok(false) }
1421
1422 async fn request_body_filter(
1429 &self,
1430 _session: &mut Session,
1431 body: &mut Option<Bytes>,
1432 end_of_stream: bool,
1433 ctx: &mut Self::CTX,
1434 ) -> Result<(), Box<Error>> {
1435 use sentinel_config::BodyStreamingMode;
1436
1437 if ctx.is_websocket_upgrade {
1439 if let Some(ref handler) = ctx.websocket_handler {
1440 let result = handler.process_client_data(body.take()).await;
1441 match result {
1442 crate::websocket::ProcessResult::Forward(data) => {
1443 *body = data;
1444 }
1445 crate::websocket::ProcessResult::Close(reason) => {
1446 warn!(
1447 correlation_id = %ctx.trace_id,
1448 code = reason.code,
1449 reason = %reason.reason,
1450 "WebSocket connection closed by agent (client->server)"
1451 );
1452 return Err(Error::explain(
1454 ErrorType::InternalError,
1455 format!("WebSocket closed: {} {}", reason.code, reason.reason),
1456 ));
1457 }
1458 }
1459 }
1460 return Ok(());
1462 }
1463
1464 let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
1466 if chunk_len > 0 {
1467 ctx.request_body_bytes += chunk_len as u64;
1468
1469 trace!(
1470 correlation_id = %ctx.trace_id,
1471 chunk_size = chunk_len,
1472 total_body_bytes = ctx.request_body_bytes,
1473 end_of_stream = end_of_stream,
1474 streaming_mode = ?ctx.request_body_streaming_mode,
1475 "Processing request body chunk"
1476 );
1477
1478 let config = ctx
1480 .config
1481 .get_or_insert_with(|| self.config_manager.current());
1482 if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
1483 warn!(
1484 correlation_id = %ctx.trace_id,
1485 body_bytes = ctx.request_body_bytes,
1486 limit = config.limits.max_body_size_bytes,
1487 "Request body size limit exceeded"
1488 );
1489 self.metrics.record_blocked_request("body_size_exceeded");
1490 return Err(Error::explain(
1491 ErrorType::InternalError,
1492 "Request body too large",
1493 ));
1494 }
1495 }
1496
1497 if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
1499 let config = ctx
1500 .config
1501 .get_or_insert_with(|| self.config_manager.current());
1502 let max_inspection_bytes = config
1503 .waf
1504 .as_ref()
1505 .map(|w| w.body_inspection.max_inspection_bytes as u64)
1506 .unwrap_or(1024 * 1024);
1507
1508 match ctx.request_body_streaming_mode {
1509 BodyStreamingMode::Stream => {
1510 if body.is_some() {
1512 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1513 .await?;
1514 } else if end_of_stream && ctx.agent_needs_more {
1515 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1517 .await?;
1518 }
1519 }
1520 BodyStreamingMode::Hybrid { buffer_threshold } => {
1521 if ctx.body_bytes_inspected < buffer_threshold as u64 {
1523 if let Some(ref chunk) = body {
1525 let bytes_to_buffer = std::cmp::min(
1526 chunk.len(),
1527 (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
1528 );
1529 ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
1530 ctx.body_bytes_inspected += bytes_to_buffer as u64;
1531
1532 if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
1534 {
1535 self.send_buffered_body_to_agents(
1537 end_of_stream && chunk.len() == bytes_to_buffer,
1538 ctx,
1539 )
1540 .await?;
1541 ctx.body_buffer.clear();
1542
1543 if bytes_to_buffer < chunk.len() {
1545 let remaining = chunk.slice(bytes_to_buffer..);
1546 let mut remaining_body = Some(remaining);
1547 self.process_body_chunk_streaming(
1548 &mut remaining_body,
1549 end_of_stream,
1550 ctx,
1551 )
1552 .await?;
1553 }
1554 }
1555 }
1556 } else {
1557 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1559 .await?;
1560 }
1561 }
1562 BodyStreamingMode::Buffer => {
1563 if let Some(ref chunk) = body {
1565 if ctx.body_bytes_inspected < max_inspection_bytes {
1566 let bytes_to_inspect = std::cmp::min(
1567 chunk.len() as u64,
1568 max_inspection_bytes - ctx.body_bytes_inspected,
1569 ) as usize;
1570
1571 ctx.body_buffer
1572 .extend_from_slice(&chunk[..bytes_to_inspect]);
1573 ctx.body_bytes_inspected += bytes_to_inspect as u64;
1574
1575 trace!(
1576 correlation_id = %ctx.trace_id,
1577 bytes_inspected = ctx.body_bytes_inspected,
1578 max_inspection_bytes = max_inspection_bytes,
1579 buffer_size = ctx.body_buffer.len(),
1580 "Buffering body for agent inspection"
1581 );
1582 }
1583 }
1584
1585 let should_send =
1587 end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1588 if should_send && !ctx.body_buffer.is_empty() {
1589 self.send_buffered_body_to_agents(end_of_stream, ctx)
1590 .await?;
1591 ctx.body_buffer.clear();
1592 }
1593 }
1594 }
1595 }
1596
1597 if end_of_stream {
1598 trace!(
1599 correlation_id = %ctx.trace_id,
1600 total_body_bytes = ctx.request_body_bytes,
1601 bytes_inspected = ctx.body_bytes_inspected,
1602 "Request body complete"
1603 );
1604 }
1605
1606 Ok(())
1607 }
1608
1609 async fn response_filter(
1610 &self,
1611 _session: &mut Session,
1612 upstream_response: &mut ResponseHeader,
1613 ctx: &mut Self::CTX,
1614 ) -> Result<(), Box<Error>> {
1615 let status = upstream_response.status.as_u16();
1616 let duration = ctx.elapsed();
1617
1618 trace!(
1619 correlation_id = %ctx.trace_id,
1620 status = status,
1621 "Starting response filter phase"
1622 );
1623
1624 if status == 101 && ctx.is_websocket_upgrade {
1626 if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1627 let inspector = crate::websocket::WebSocketInspector::with_metrics(
1629 self.agent_manager.clone(),
1630 ctx.route_id
1631 .clone()
1632 .unwrap_or_else(|| "unknown".to_string()),
1633 ctx.trace_id.clone(),
1634 ctx.client_ip.clone(),
1635 100, Some(self.metrics.clone()),
1637 );
1638
1639 let handler = crate::websocket::WebSocketHandler::new(
1640 std::sync::Arc::new(inspector),
1641 1024 * 1024, );
1643
1644 ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1645
1646 info!(
1647 correlation_id = %ctx.trace_id,
1648 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1649 agent_count = ctx.websocket_inspection_agents.len(),
1650 "WebSocket upgrade successful, frame inspection enabled"
1651 );
1652 } else if ctx.websocket_skip_inspection {
1653 debug!(
1654 correlation_id = %ctx.trace_id,
1655 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1656 "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1657 );
1658 } else {
1659 debug!(
1660 correlation_id = %ctx.trace_id,
1661 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1662 "WebSocket upgrade successful"
1663 );
1664 }
1665 }
1666
1667 trace!(
1669 correlation_id = %ctx.trace_id,
1670 "Applying security headers"
1671 );
1672 self.apply_security_headers(upstream_response).ok();
1673
1674 upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1676
1677 if let Some(ref rate_info) = ctx.rate_limit_info {
1679 upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1680 upstream_response
1681 .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1682 upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1683 }
1684
1685 if ctx.inference_budget_enabled {
1687 if let Some(remaining) = ctx.inference_budget_remaining {
1688 upstream_response.insert_header("X-Budget-Remaining", remaining.to_string())?;
1689 }
1690 if let Some(period_reset) = ctx.inference_budget_period_reset {
1691 let reset_datetime = chrono::DateTime::from_timestamp(period_reset as i64, 0)
1693 .map(|dt| dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1694 .unwrap_or_else(|| period_reset.to_string());
1695 upstream_response.insert_header("X-Budget-Period-Reset", reset_datetime)?;
1696 }
1697 }
1698
1699 if let Some(ref country_code) = ctx.geo_country_code {
1701 upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1702 }
1703
1704 if ctx.sticky_session_new_assignment {
1706 if let Some(ref set_cookie_header) = ctx.sticky_session_set_cookie {
1707 upstream_response.insert_header("Set-Cookie", set_cookie_header)?;
1708 trace!(
1709 correlation_id = %ctx.trace_id,
1710 sticky_target_index = ?ctx.sticky_target_index,
1711 "Added sticky session Set-Cookie header"
1712 );
1713 }
1714 }
1715
1716 if ctx.guardrail_warning {
1718 upstream_response.insert_header("X-Guardrail-Warning", "prompt-injection-detected")?;
1719 }
1720
1721 if ctx.used_fallback() {
1723 upstream_response.insert_header("X-Fallback-Used", "true")?;
1724
1725 if let Some(ref upstream) = ctx.upstream {
1726 upstream_response.insert_header("X-Fallback-Upstream", upstream)?;
1727 }
1728
1729 if let Some(ref reason) = ctx.fallback_reason {
1730 upstream_response.insert_header("X-Fallback-Reason", reason.to_string())?;
1731 }
1732
1733 if let Some(ref original) = ctx.original_upstream {
1734 upstream_response.insert_header("X-Original-Upstream", original)?;
1735 }
1736
1737 if let Some(ref mapping) = ctx.model_mapping_applied {
1738 upstream_response.insert_header(
1739 "X-Model-Mapping",
1740 format!("{} -> {}", mapping.0, mapping.1),
1741 )?;
1742 }
1743
1744 trace!(
1745 correlation_id = %ctx.trace_id,
1746 fallback_attempt = ctx.fallback_attempt,
1747 fallback_upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1748 original_upstream = ctx.original_upstream.as_deref().unwrap_or("unknown"),
1749 "Added fallback response headers"
1750 );
1751
1752 if status < 400 {
1754 if let Some(metrics) = get_fallback_metrics() {
1755 metrics.record_fallback_success(
1756 ctx.route_id.as_deref().unwrap_or("unknown"),
1757 ctx.upstream.as_deref().unwrap_or("unknown"),
1758 );
1759 }
1760 }
1761 }
1762
1763 if ctx.inference_rate_limit_enabled {
1765 let content_type = upstream_response
1767 .headers
1768 .get("content-type")
1769 .and_then(|ct| ct.to_str().ok());
1770
1771 if is_sse_response(content_type) {
1772 let provider = ctx
1774 .route_config
1775 .as_ref()
1776 .and_then(|r| r.inference.as_ref())
1777 .map(|i| i.provider.clone())
1778 .unwrap_or_default();
1779
1780 ctx.inference_streaming_response = true;
1781 ctx.inference_streaming_counter = Some(StreamingTokenCounter::new(
1782 provider,
1783 ctx.inference_model.clone(),
1784 ));
1785
1786 trace!(
1787 correlation_id = %ctx.trace_id,
1788 content_type = ?content_type,
1789 model = ?ctx.inference_model,
1790 "Initialized streaming token counter for SSE response"
1791 );
1792 }
1793 }
1794
1795 if status >= 400 {
1797 trace!(
1798 correlation_id = %ctx.trace_id,
1799 status = status,
1800 "Handling error response"
1801 );
1802 self.handle_error_response(upstream_response, ctx).await?;
1803 }
1804
1805 self.metrics.record_request(
1807 ctx.route_id.as_deref().unwrap_or("unknown"),
1808 &ctx.method,
1809 status,
1810 duration,
1811 );
1812
1813 if let Some(ref mut span) = ctx.otel_span {
1815 span.set_status(status);
1816 if let Some(ref upstream) = ctx.upstream {
1817 span.set_upstream(upstream, "");
1818 }
1819 if status >= 500 {
1820 span.record_error(&format!("HTTP {}", status));
1821 }
1822 }
1823
1824 if let Some(ref upstream) = ctx.upstream {
1826 let success = status < 500;
1827
1828 trace!(
1829 correlation_id = %ctx.trace_id,
1830 upstream = %upstream,
1831 success = success,
1832 status = status,
1833 "Recording passive health check result"
1834 );
1835
1836 let error_msg = if !success {
1837 Some(format!("HTTP {}", status))
1838 } else {
1839 None
1840 };
1841 self.passive_health
1842 .record_outcome(upstream, success, error_msg.as_deref())
1843 .await;
1844
1845 if let Some(pool) = self.upstream_pools.get(upstream).await {
1847 pool.report_result(upstream, success).await;
1848 }
1849
1850 if !success {
1851 warn!(
1852 correlation_id = %ctx.trace_id,
1853 upstream = %upstream,
1854 status = status,
1855 "Upstream returned error status"
1856 );
1857 }
1858 }
1859
1860 if status >= 500 {
1862 error!(
1863 correlation_id = %ctx.trace_id,
1864 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1865 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1866 method = %ctx.method,
1867 path = %ctx.path,
1868 status = status,
1869 duration_ms = duration.as_millis(),
1870 attempts = ctx.upstream_attempts,
1871 "Request completed with server error"
1872 );
1873 } else if status >= 400 {
1874 warn!(
1875 correlation_id = %ctx.trace_id,
1876 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1877 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1878 method = %ctx.method,
1879 path = %ctx.path,
1880 status = status,
1881 duration_ms = duration.as_millis(),
1882 "Request completed with client error"
1883 );
1884 } else {
1885 debug!(
1886 correlation_id = %ctx.trace_id,
1887 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1888 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1889 method = %ctx.method,
1890 path = %ctx.path,
1891 status = status,
1892 duration_ms = duration.as_millis(),
1893 attempts = ctx.upstream_attempts,
1894 "Request completed"
1895 );
1896 }
1897
1898 Ok(())
1899 }
1900
1901 async fn upstream_request_filter(
1904 &self,
1905 _session: &mut Session,
1906 upstream_request: &mut pingora::http::RequestHeader,
1907 ctx: &mut Self::CTX,
1908 ) -> Result<()>
1909 where
1910 Self::CTX: Send + Sync,
1911 {
1912 trace!(
1913 correlation_id = %ctx.trace_id,
1914 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1915 "Applying upstream request modifications"
1916 );
1917
1918 upstream_request
1920 .insert_header("X-Trace-Id", &ctx.trace_id)
1921 .ok();
1922
1923 if let Some(ref span) = ctx.otel_span {
1925 let sampled = ctx.trace_context.as_ref().map(|c| c.sampled).unwrap_or(true);
1926 let traceparent =
1927 crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled);
1928 upstream_request
1929 .insert_header(crate::otel::TRACEPARENT_HEADER, &traceparent)
1930 .ok();
1931 }
1932
1933 upstream_request
1935 .insert_header("X-Forwarded-By", "Sentinel")
1936 .ok();
1937
1938 if let Some(ref route_config) = ctx.route_config {
1941 let mods = route_config.policies.request_headers.clone();
1942
1943 for (name, value) in mods.set {
1945 upstream_request.insert_header(name, value).ok();
1946 }
1947
1948 for (name, value) in mods.add {
1950 upstream_request.append_header(name, value).ok();
1951 }
1952
1953 for name in &mods.remove {
1955 upstream_request.remove_header(name);
1956 }
1957
1958 trace!(
1959 correlation_id = %ctx.trace_id,
1960 "Applied request header modifications"
1961 );
1962 }
1963
1964 upstream_request.remove_header("X-Internal-Token");
1966 upstream_request.remove_header("Authorization-Internal");
1967
1968 if let Some(ref route_config) = ctx.route_config {
1971 if let Some(ref shadow_config) = route_config.shadow {
1972 let pools_snapshot = self.upstream_pools.snapshot().await;
1974 let upstream_pools = std::sync::Arc::new(pools_snapshot);
1975
1976 let route_id = ctx.route_id.clone().unwrap_or_else(|| "unknown".to_string());
1978
1979 let shadow_manager = crate::shadow::ShadowManager::new(
1981 upstream_pools,
1982 shadow_config.clone(),
1983 Some(std::sync::Arc::clone(&self.metrics)),
1984 route_id,
1985 );
1986
1987 if shadow_manager.should_shadow(upstream_request) {
1989 trace!(
1990 correlation_id = %ctx.trace_id,
1991 shadow_upstream = %shadow_config.upstream,
1992 percentage = shadow_config.percentage,
1993 "Shadowing request"
1994 );
1995
1996 let shadow_headers = upstream_request.clone();
1998
1999 let shadow_ctx = crate::upstream::RequestContext {
2001 client_ip: ctx.client_ip.parse().ok(),
2002 headers: std::collections::HashMap::new(), path: ctx.path.clone(),
2004 method: ctx.method.clone(),
2005 };
2006
2007 let buffer_body = shadow_config.buffer_body
2009 && crate::shadow::should_buffer_method(&ctx.method);
2010
2011 if buffer_body {
2012 trace!(
2016 correlation_id = %ctx.trace_id,
2017 "Deferring shadow request until body is buffered"
2018 );
2019 ctx.shadow_pending = Some(crate::proxy::context::ShadowPendingRequest {
2020 headers: shadow_headers,
2021 manager: std::sync::Arc::new(shadow_manager),
2022 request_ctx: shadow_ctx,
2023 include_body: true,
2024 });
2025 if !ctx.body_inspection_enabled {
2028 ctx.body_inspection_enabled = true;
2029 }
2032 } else {
2033 shadow_manager.shadow_request(shadow_headers, None, shadow_ctx);
2035 ctx.shadow_sent = true;
2036 }
2037 }
2038 }
2039 }
2040
2041 Ok(())
2042 }
2043
2044 fn response_body_filter(
2050 &self,
2051 _session: &mut Session,
2052 body: &mut Option<Bytes>,
2053 end_of_stream: bool,
2054 ctx: &mut Self::CTX,
2055 ) -> Result<Option<Duration>, Box<Error>> {
2056 if ctx.is_websocket_upgrade {
2059 if let Some(ref handler) = ctx.websocket_handler {
2060 let handler = handler.clone();
2061 let data = body.take();
2062
2063 let result = tokio::task::block_in_place(|| {
2066 tokio::runtime::Handle::current()
2067 .block_on(async { handler.process_server_data(data).await })
2068 });
2069
2070 match result {
2071 crate::websocket::ProcessResult::Forward(data) => {
2072 *body = data;
2073 }
2074 crate::websocket::ProcessResult::Close(reason) => {
2075 warn!(
2076 correlation_id = %ctx.trace_id,
2077 code = reason.code,
2078 reason = %reason.reason,
2079 "WebSocket connection closed by agent (server->client)"
2080 );
2081 let close_frame =
2084 crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
2085 let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
2086 if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
2087 *body = Some(Bytes::from(encoded));
2088 }
2089 }
2090 }
2091 }
2092 return Ok(None);
2094 }
2095
2096 if let Some(ref chunk) = body {
2098 ctx.response_bytes += chunk.len() as u64;
2099
2100 trace!(
2101 correlation_id = %ctx.trace_id,
2102 chunk_size = chunk.len(),
2103 total_response_bytes = ctx.response_bytes,
2104 end_of_stream = end_of_stream,
2105 "Processing response body chunk"
2106 );
2107
2108 if let Some(ref mut counter) = ctx.inference_streaming_counter {
2110 let result = counter.process_chunk(chunk);
2111
2112 if result.content.is_some() || result.is_done {
2113 trace!(
2114 correlation_id = %ctx.trace_id,
2115 has_content = result.content.is_some(),
2116 is_done = result.is_done,
2117 chunks_processed = counter.chunks_processed(),
2118 accumulated_content_len = counter.content().len(),
2119 "Processed SSE chunk for token counting"
2120 );
2121 }
2122 }
2123
2124 if ctx.response_body_inspection_enabled
2128 && !ctx.response_body_inspection_agents.is_empty()
2129 {
2130 let config = ctx
2131 .config
2132 .get_or_insert_with(|| self.config_manager.current());
2133 let max_inspection_bytes = config
2134 .waf
2135 .as_ref()
2136 .map(|w| w.body_inspection.max_inspection_bytes as u64)
2137 .unwrap_or(1024 * 1024);
2138
2139 if ctx.response_body_bytes_inspected < max_inspection_bytes {
2140 let bytes_to_inspect = std::cmp::min(
2141 chunk.len() as u64,
2142 max_inspection_bytes - ctx.response_body_bytes_inspected,
2143 ) as usize;
2144
2145 ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
2149 ctx.response_body_chunk_index += 1;
2150
2151 trace!(
2152 correlation_id = %ctx.trace_id,
2153 bytes_inspected = ctx.response_body_bytes_inspected,
2154 max_inspection_bytes = max_inspection_bytes,
2155 chunk_index = ctx.response_body_chunk_index,
2156 "Tracking response body for inspection"
2157 );
2158 }
2159 }
2160 }
2161
2162 if end_of_stream {
2163 trace!(
2164 correlation_id = %ctx.trace_id,
2165 total_response_bytes = ctx.response_bytes,
2166 response_bytes_inspected = ctx.response_body_bytes_inspected,
2167 "Response body complete"
2168 );
2169 }
2170
2171 Ok(None)
2173 }
2174
2175 async fn connected_to_upstream(
2178 &self,
2179 _session: &mut Session,
2180 reused: bool,
2181 peer: &HttpPeer,
2182 #[cfg(unix)] _fd: RawFd,
2183 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
2184 digest: Option<&Digest>,
2185 ctx: &mut Self::CTX,
2186 ) -> Result<(), Box<Error>> {
2187 ctx.connection_reused = reused;
2189
2190 if reused {
2192 trace!(
2193 correlation_id = %ctx.trace_id,
2194 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2195 peer_address = %peer.address(),
2196 "Reusing existing upstream connection"
2197 );
2198 } else {
2199 debug!(
2200 correlation_id = %ctx.trace_id,
2201 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2202 peer_address = %peer.address(),
2203 ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
2204 "Established new upstream connection"
2205 );
2206 }
2207
2208 Ok(())
2209 }
2210
2211 fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
2221 let route_id = match ctx.route_id.as_deref() {
2223 Some(id) => id,
2224 None => {
2225 trace!(
2226 correlation_id = %ctx.trace_id,
2227 "Cache filter: no route ID, skipping cache"
2228 );
2229 return Ok(());
2230 }
2231 };
2232
2233 if !self.cache_manager.is_enabled(route_id) {
2235 trace!(
2236 correlation_id = %ctx.trace_id,
2237 route_id = %route_id,
2238 "Cache disabled for route"
2239 );
2240 return Ok(());
2241 }
2242
2243 if !self
2245 .cache_manager
2246 .is_method_cacheable(route_id, &ctx.method)
2247 {
2248 trace!(
2249 correlation_id = %ctx.trace_id,
2250 route_id = %route_id,
2251 method = %ctx.method,
2252 "Method not cacheable"
2253 );
2254 return Ok(());
2255 }
2256
2257 debug!(
2259 correlation_id = %ctx.trace_id,
2260 route_id = %route_id,
2261 method = %ctx.method,
2262 path = %ctx.path,
2263 "Enabling HTTP caching for request"
2264 );
2265
2266 let storage = get_cache_storage();
2268 let eviction = get_cache_eviction();
2269 let cache_lock = get_cache_lock();
2270
2271 session.cache.enable(
2273 storage,
2274 Some(eviction),
2275 None, Some(cache_lock),
2277 None, );
2279
2280 ctx.cache_eligible = true;
2282
2283 trace!(
2284 correlation_id = %ctx.trace_id,
2285 route_id = %route_id,
2286 cache_enabled = session.cache.enabled(),
2287 "Cache enabled for request"
2288 );
2289
2290 Ok(())
2291 }
2292
2293 fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
2298 let req_header = session.req_header();
2299 let method = req_header.method.as_str();
2300 let path = req_header.uri.path();
2301 let host = ctx.host.as_deref().unwrap_or("unknown");
2302 let query = req_header.uri.query();
2303
2304 let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2306
2307 trace!(
2308 correlation_id = %ctx.trace_id,
2309 cache_key = %key_string,
2310 "Generated cache key"
2311 );
2312
2313 Ok(CacheKey::default(req_header))
2316 }
2317
2318 fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
2323 session.cache.cache_miss();
2325
2326 if let Some(route_id) = ctx.route_id.as_deref() {
2328 self.cache_manager.stats().record_miss();
2329
2330 trace!(
2331 correlation_id = %ctx.trace_id,
2332 route_id = %route_id,
2333 path = %ctx.path,
2334 "Cache miss"
2335 );
2336 }
2337 }
2338
2339 async fn cache_hit_filter(
2345 &self,
2346 session: &mut Session,
2347 meta: &CacheMeta,
2348 _hit_handler: &mut HitHandler,
2349 is_fresh: bool,
2350 ctx: &mut Self::CTX,
2351 ) -> Result<Option<ForcedInvalidationKind>>
2352 where
2353 Self::CTX: Send + Sync,
2354 {
2355 let req_header = session.req_header();
2357 let method = req_header.method.as_str();
2358 let path = req_header.uri.path();
2359 let host = req_header.uri.host().unwrap_or("localhost");
2360 let query = req_header.uri.query();
2361
2362 let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2364
2365 if self.cache_manager.should_invalidate(&cache_key) {
2367 info!(
2368 correlation_id = %ctx.trace_id,
2369 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2370 cache_key = %cache_key,
2371 "Cache entry invalidated by purge request"
2372 );
2373 return Ok(Some(ForcedInvalidationKind::ForceExpired));
2375 }
2376
2377 if is_fresh {
2379 self.cache_manager.stats().record_hit();
2380
2381 debug!(
2382 correlation_id = %ctx.trace_id,
2383 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2384 is_fresh = is_fresh,
2385 "Cache hit (fresh)"
2386 );
2387 } else {
2388 trace!(
2389 correlation_id = %ctx.trace_id,
2390 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2391 is_fresh = is_fresh,
2392 "Cache hit (stale)"
2393 );
2394 }
2395
2396 Ok(None)
2398 }
2399
2400 fn response_cache_filter(
2405 &self,
2406 _session: &Session,
2407 resp: &ResponseHeader,
2408 ctx: &mut Self::CTX,
2409 ) -> Result<RespCacheable> {
2410 let route_id = match ctx.route_id.as_deref() {
2411 Some(id) => id,
2412 None => {
2413 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2414 "no_route",
2415 )));
2416 }
2417 };
2418
2419 if !self.cache_manager.is_enabled(route_id) {
2421 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2422 "disabled",
2423 )));
2424 }
2425
2426 let status = resp.status.as_u16();
2427
2428 if !self.cache_manager.is_status_cacheable(route_id, status) {
2430 trace!(
2431 correlation_id = %ctx.trace_id,
2432 route_id = %route_id,
2433 status = status,
2434 "Status code not cacheable"
2435 );
2436 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2437 }
2438
2439 if let Some(cache_control) = resp.headers.get("cache-control") {
2441 if let Ok(cc_str) = cache_control.to_str() {
2442 if crate::cache::CacheManager::is_no_cache(cc_str) {
2443 trace!(
2444 correlation_id = %ctx.trace_id,
2445 route_id = %route_id,
2446 cache_control = %cc_str,
2447 "Response has no-cache directive"
2448 );
2449 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2450 }
2451 }
2452 }
2453
2454 let cache_control = resp
2456 .headers
2457 .get("cache-control")
2458 .and_then(|v| v.to_str().ok());
2459 let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
2460
2461 if ttl.is_zero() {
2462 trace!(
2463 correlation_id = %ctx.trace_id,
2464 route_id = %route_id,
2465 "TTL is zero, not caching"
2466 );
2467 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2468 }
2469
2470 let config = self
2472 .cache_manager
2473 .get_route_config(route_id)
2474 .unwrap_or_default();
2475
2476 let now = std::time::SystemTime::now();
2478 let fresh_until = now + ttl;
2479
2480 let header = resp.clone();
2482
2483 let cache_meta = CacheMeta::new(
2485 fresh_until,
2486 now,
2487 config.stale_while_revalidate_secs as u32,
2488 config.stale_if_error_secs as u32,
2489 header,
2490 );
2491
2492 self.cache_manager.stats().record_store();
2494
2495 debug!(
2496 correlation_id = %ctx.trace_id,
2497 route_id = %route_id,
2498 status = status,
2499 ttl_secs = ttl.as_secs(),
2500 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2501 stale_if_error_secs = config.stale_if_error_secs,
2502 "Caching response"
2503 );
2504
2505 Ok(RespCacheable::Cacheable(cache_meta))
2506 }
2507
2508 fn should_serve_stale(
2512 &self,
2513 _session: &mut Session,
2514 ctx: &mut Self::CTX,
2515 error: Option<&Error>,
2516 ) -> bool {
2517 let route_id = match ctx.route_id.as_deref() {
2518 Some(id) => id,
2519 None => return false,
2520 };
2521
2522 let config = match self.cache_manager.get_route_config(route_id) {
2524 Some(c) => c,
2525 None => return false,
2526 };
2527
2528 if let Some(e) = error {
2530 if e.esource() == &pingora::ErrorSource::Upstream {
2532 debug!(
2533 correlation_id = %ctx.trace_id,
2534 route_id = %route_id,
2535 error = %e,
2536 stale_if_error_secs = config.stale_if_error_secs,
2537 "Considering stale-if-error"
2538 );
2539 return config.stale_if_error_secs > 0;
2540 }
2541 }
2542
2543 if error.is_none() && config.stale_while_revalidate_secs > 0 {
2545 trace!(
2546 correlation_id = %ctx.trace_id,
2547 route_id = %route_id,
2548 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2549 "Allowing stale-while-revalidate"
2550 );
2551 return true;
2552 }
2553
2554 false
2555 }
2556
2557 fn range_header_filter(
2567 &self,
2568 session: &mut Session,
2569 response: &mut ResponseHeader,
2570 ctx: &mut Self::CTX,
2571 ) -> pingora_proxy::RangeType
2572 where
2573 Self::CTX: Send + Sync,
2574 {
2575 let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
2577 matches!(
2579 config.service_type,
2580 sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
2581 )
2582 });
2583
2584 if !supports_range {
2585 trace!(
2586 correlation_id = %ctx.trace_id,
2587 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2588 "Range request not supported for this route type"
2589 );
2590 return pingora_proxy::RangeType::None;
2591 }
2592
2593 let range_type = pingora_proxy::range_header_filter(session.req_header(), response);
2595
2596 match &range_type {
2597 pingora_proxy::RangeType::None => {
2598 trace!(
2599 correlation_id = %ctx.trace_id,
2600 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2601 "No range request or not applicable"
2602 );
2603 }
2604 pingora_proxy::RangeType::Single(range) => {
2605 trace!(
2606 correlation_id = %ctx.trace_id,
2607 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2608 range_start = range.start,
2609 range_end = range.end,
2610 "Processing single-range request"
2611 );
2612 }
2613 pingora_proxy::RangeType::Multi(multi) => {
2614 trace!(
2615 correlation_id = %ctx.trace_id,
2616 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2617 range_count = multi.ranges.len(),
2618 "Processing multi-range request"
2619 );
2620 }
2621 pingora_proxy::RangeType::Invalid => {
2622 debug!(
2623 correlation_id = %ctx.trace_id,
2624 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2625 "Invalid range header"
2626 );
2627 }
2628 }
2629
2630 range_type
2631 }
2632
2633 async fn fail_to_proxy(
2636 &self,
2637 session: &mut Session,
2638 e: &Error,
2639 ctx: &mut Self::CTX,
2640 ) -> pingora_proxy::FailToProxy
2641 where
2642 Self::CTX: Send + Sync,
2643 {
2644 let error_code = match e.etype() {
2645 ErrorType::ConnectRefused => 503,
2647 ErrorType::ConnectTimedout => 504,
2648 ErrorType::ConnectNoRoute => 502,
2649
2650 ErrorType::ReadTimedout => 504,
2652 ErrorType::WriteTimedout => 504,
2653
2654 ErrorType::TLSHandshakeFailure => 502,
2656 ErrorType::InvalidCert => 502,
2657
2658 ErrorType::InvalidHTTPHeader => 400,
2660 ErrorType::H2Error => 502,
2661
2662 ErrorType::ConnectProxyFailure => 502,
2664 ErrorType::ConnectionClosed => 502,
2665
2666 ErrorType::HTTPStatus(status) => *status,
2668
2669 ErrorType::InternalError => {
2671 let error_str = e.to_string();
2673 if error_str.contains("upstream")
2674 || error_str.contains("DNS")
2675 || error_str.contains("resolve")
2676 {
2677 502
2678 } else {
2679 500
2680 }
2681 }
2682
2683 _ => 502,
2685 };
2686
2687 error!(
2688 correlation_id = %ctx.trace_id,
2689 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2690 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2691 error_type = ?e.etype(),
2692 error = %e,
2693 error_code = error_code,
2694 "Proxy error occurred"
2695 );
2696
2697 self.metrics
2699 .record_blocked_request(&format!("proxy_error_{}", error_code));
2700
2701 let error_message = match error_code {
2705 400 => "Bad Request",
2706 502 => "Bad Gateway",
2707 503 => "Service Unavailable",
2708 504 => "Gateway Timeout",
2709 _ => "Internal Server Error",
2710 };
2711
2712 let body = format!(
2714 r#"{{"error":"{} {}","trace_id":"{}"}}"#,
2715 error_code, error_message, ctx.trace_id
2716 );
2717
2718 let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
2720 header
2721 .insert_header("Content-Type", "application/json")
2722 .ok();
2723 header
2724 .insert_header("Content-Length", body.len().to_string())
2725 .ok();
2726 header
2727 .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
2728 .ok();
2729 header.insert_header("Connection", "close").ok();
2730
2731 if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
2733 warn!(
2734 correlation_id = %ctx.trace_id,
2735 error = %write_err,
2736 "Failed to write error response header"
2737 );
2738 } else {
2739 if let Err(write_err) = session
2741 .write_response_body(Some(bytes::Bytes::from(body)), true)
2742 .await
2743 {
2744 warn!(
2745 correlation_id = %ctx.trace_id,
2746 error = %write_err,
2747 "Failed to write error response body"
2748 );
2749 }
2750 }
2751
2752 pingora_proxy::FailToProxy {
2755 error_code,
2756 can_reuse_downstream: false,
2757 }
2758 }
2759
2760 fn error_while_proxy(
2766 &self,
2767 peer: &HttpPeer,
2768 session: &mut Session,
2769 e: Box<Error>,
2770 ctx: &mut Self::CTX,
2771 client_reused: bool,
2772 ) -> Box<Error> {
2773 let error_type = e.etype().clone();
2774 let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
2775
2776 let is_retryable = matches!(
2778 error_type,
2779 ErrorType::ConnectTimedout
2780 | ErrorType::ReadTimedout
2781 | ErrorType::WriteTimedout
2782 | ErrorType::ConnectionClosed
2783 | ErrorType::ConnectRefused
2784 );
2785
2786 warn!(
2788 correlation_id = %ctx.trace_id,
2789 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2790 upstream = %upstream_id,
2791 peer_address = %peer.address(),
2792 error_type = ?error_type,
2793 error = %e,
2794 client_reused = client_reused,
2795 is_retryable = is_retryable,
2796 "Error during proxy operation"
2797 );
2798
2799 let peer_address = peer.address().to_string();
2802 let upstream_pools = self.upstream_pools.clone();
2803 let upstream_id_owned = upstream_id.to_string();
2804 tokio::spawn(async move {
2805 if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
2806 pool.report_result(&peer_address, false).await;
2807 }
2808 });
2809
2810 self.metrics
2812 .record_blocked_request(&format!("proxy_error_{:?}", error_type));
2813
2814 let mut enhanced_error = e.more_context(format!(
2816 "Upstream: {}, Peer: {}, Attempts: {}",
2817 upstream_id,
2818 peer.address(),
2819 ctx.upstream_attempts
2820 ));
2821
2822 if is_retryable {
2827 let can_retry = if client_reused {
2828 !session.as_ref().retry_buffer_truncated()
2830 } else {
2831 true
2833 };
2834
2835 enhanced_error.retry.decide_reuse(can_retry);
2836
2837 if can_retry {
2838 debug!(
2839 correlation_id = %ctx.trace_id,
2840 upstream = %upstream_id,
2841 error_type = ?error_type,
2842 "Error is retryable, will attempt retry"
2843 );
2844 }
2845 } else {
2846 enhanced_error.retry.decide_reuse(false);
2848 }
2849
2850 enhanced_error
2851 }
2852
2853 async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
2854 self.reload_coordinator.dec_requests();
2856
2857 if !ctx.shadow_sent {
2859 if let Some(shadow_pending) = ctx.shadow_pending.take() {
2860 let body = if shadow_pending.include_body && !ctx.body_buffer.is_empty() {
2861 Some(ctx.body_buffer.clone())
2863 } else {
2864 None
2865 };
2866
2867 trace!(
2868 correlation_id = %ctx.trace_id,
2869 body_size = body.as_ref().map(|b| b.len()).unwrap_or(0),
2870 "Firing deferred shadow request with buffered body"
2871 );
2872
2873 shadow_pending.manager.shadow_request(
2874 shadow_pending.headers,
2875 body,
2876 shadow_pending.request_ctx,
2877 );
2878 ctx.shadow_sent = true;
2879 }
2880 }
2881
2882 let duration = ctx.elapsed();
2883
2884 let status = session
2886 .response_written()
2887 .map(|r| r.status.as_u16())
2888 .unwrap_or(0);
2889
2890 if let (Some(ref peer_addr), Some(ref upstream_id)) =
2893 (&ctx.selected_upstream_address, &ctx.upstream)
2894 {
2895 let success = status > 0 && status < 500;
2897
2898 if let Some(pool) = self.upstream_pools.get(upstream_id).await {
2899 pool.report_result_with_latency(peer_addr, success, Some(duration))
2900 .await;
2901 trace!(
2902 correlation_id = %ctx.trace_id,
2903 upstream = %upstream_id,
2904 peer_address = %peer_addr,
2905 success = success,
2906 duration_ms = duration.as_millis(),
2907 status = status,
2908 "Reported result to adaptive load balancer"
2909 );
2910 }
2911
2912 if ctx.inference_rate_limit_enabled && success {
2914 let cold_detected = self.warmth_tracker.record_request(peer_addr, duration);
2915 if cold_detected {
2916 debug!(
2917 correlation_id = %ctx.trace_id,
2918 upstream = %upstream_id,
2919 peer_address = %peer_addr,
2920 duration_ms = duration.as_millis(),
2921 "Cold model detected on inference upstream"
2922 );
2923 }
2924 }
2925 }
2926
2927 if ctx.inference_rate_limit_enabled {
2930 if let (Some(route_id), Some(ref rate_limit_key)) =
2931 (ctx.route_id.as_deref(), &ctx.inference_rate_limit_key)
2932 {
2933 let response_headers = session
2935 .response_written()
2936 .map(|r| &r.headers)
2937 .cloned()
2938 .unwrap_or_default();
2939
2940 let streaming_result = if ctx.inference_streaming_response {
2942 ctx.inference_streaming_counter
2943 .as_ref()
2944 .map(|counter| counter.finalize())
2945 } else {
2946 None
2947 };
2948
2949 if let Some(ref result) = streaming_result {
2951 debug!(
2952 correlation_id = %ctx.trace_id,
2953 output_tokens = result.output_tokens,
2954 input_tokens = ?result.input_tokens,
2955 source = ?result.source,
2956 content_length = result.content_length,
2957 "Finalized streaming token count"
2958 );
2959 }
2960
2961 if ctx.inference_streaming_response {
2963 if let Some(ref route_config) = ctx.route_config {
2964 if let Some(ref inference) = route_config.inference {
2965 if let Some(ref guardrails) = inference.guardrails {
2966 if let Some(ref pii_config) = guardrails.pii_detection {
2967 if pii_config.enabled {
2968 if let Some(ref counter) = ctx.inference_streaming_counter {
2970 let response_content = counter.content();
2971 if !response_content.is_empty() {
2972 let pii_result = self
2973 .guardrail_processor
2974 .check_pii(
2975 pii_config,
2976 response_content,
2977 ctx.route_id.as_deref(),
2978 &ctx.trace_id,
2979 )
2980 .await;
2981
2982 match pii_result {
2983 crate::inference::PiiCheckResult::Detected {
2984 detections,
2985 redacted_content: _,
2986 } => {
2987 warn!(
2988 correlation_id = %ctx.trace_id,
2989 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2990 detection_count = detections.len(),
2991 "PII detected in inference response"
2992 );
2993
2994 ctx.pii_detection_categories = detections
2996 .iter()
2997 .map(|d| d.category.clone())
2998 .collect();
2999
3000 for detection in &detections {
3002 self.metrics.record_pii_detected(
3003 ctx.route_id.as_deref().unwrap_or("unknown"),
3004 &detection.category,
3005 );
3006 }
3007 }
3008 crate::inference::PiiCheckResult::Clean => {
3009 trace!(
3010 correlation_id = %ctx.trace_id,
3011 "No PII detected in response"
3012 );
3013 }
3014 crate::inference::PiiCheckResult::Error { message } => {
3015 debug!(
3016 correlation_id = %ctx.trace_id,
3017 error = %message,
3018 "PII detection check failed"
3019 );
3020 }
3021 }
3022 }
3023 }
3024 }
3025 }
3026 }
3027 }
3028 }
3029 }
3030
3031 let empty_body: &[u8] = &[];
3035
3036 if let Some(actual_estimate) = self.inference_rate_limit_manager.record_actual(
3037 route_id,
3038 rate_limit_key,
3039 &response_headers,
3040 empty_body,
3041 ctx.inference_estimated_tokens,
3042 ) {
3043 let (actual_tokens, source_info) = if let Some(ref streaming) = streaming_result {
3045 if streaming.total_tokens.is_some() {
3047 (streaming.total_tokens.unwrap(), "streaming_api")
3048 } else if actual_estimate.source == crate::inference::TokenSource::Estimated {
3049 let total = ctx.inference_input_tokens + streaming.output_tokens;
3052 (total, "streaming_tiktoken")
3053 } else {
3054 (actual_estimate.tokens, "headers")
3055 }
3056 } else {
3057 (actual_estimate.tokens, "headers")
3058 };
3059
3060 ctx.inference_actual_tokens = Some(actual_tokens);
3061
3062 debug!(
3063 correlation_id = %ctx.trace_id,
3064 route_id = route_id,
3065 estimated_tokens = ctx.inference_estimated_tokens,
3066 actual_tokens = actual_tokens,
3067 source = source_info,
3068 streaming_response = ctx.inference_streaming_response,
3069 model = ?ctx.inference_model,
3070 "Recorded actual inference tokens"
3071 );
3072
3073 if ctx.inference_budget_enabled {
3075 let alerts = self.inference_rate_limit_manager.record_budget(
3076 route_id,
3077 rate_limit_key,
3078 actual_tokens,
3079 );
3080
3081 for alert in alerts.iter() {
3083 warn!(
3084 correlation_id = %ctx.trace_id,
3085 route_id = route_id,
3086 tenant = %alert.tenant,
3087 threshold_pct = alert.threshold * 100.0,
3088 tokens_used = alert.tokens_used,
3089 tokens_limit = alert.tokens_limit,
3090 "Token budget alert threshold crossed"
3091 );
3092 }
3093
3094 if let Some(status) = self.inference_rate_limit_manager.budget_status(
3096 route_id,
3097 rate_limit_key,
3098 ) {
3099 ctx.inference_budget_remaining = Some(status.tokens_remaining as i64);
3100 }
3101 }
3102
3103 if ctx.inference_cost_enabled {
3105 if let Some(model) = ctx.inference_model.as_deref() {
3106 let (input_tokens, output_tokens) = if let Some(ref streaming) = streaming_result {
3108 let input = streaming.input_tokens.unwrap_or(ctx.inference_input_tokens);
3110 let output = streaming.output_tokens;
3111 (input, output)
3112 } else {
3113 let input = ctx.inference_input_tokens;
3115 let output = actual_tokens.saturating_sub(input);
3116 (input, output)
3117 };
3118 ctx.inference_output_tokens = output_tokens;
3119
3120 if let Some(cost_result) = self.inference_rate_limit_manager.calculate_cost(
3121 route_id,
3122 model,
3123 input_tokens,
3124 output_tokens,
3125 ) {
3126 ctx.inference_request_cost = Some(cost_result.total_cost);
3127
3128 trace!(
3129 correlation_id = %ctx.trace_id,
3130 route_id = route_id,
3131 model = model,
3132 input_tokens = input_tokens,
3133 output_tokens = output_tokens,
3134 total_cost = cost_result.total_cost,
3135 currency = %cost_result.currency,
3136 "Calculated inference request cost"
3137 );
3138 }
3139 }
3140 }
3141 }
3142 }
3143 }
3144
3145 if self.log_manager.access_log_enabled() {
3147 let access_entry = AccessLogEntry {
3148 timestamp: chrono::Utc::now().to_rfc3339(),
3149 trace_id: ctx.trace_id.clone(),
3150 method: ctx.method.clone(),
3151 path: ctx.path.clone(),
3152 query: ctx.query.clone(),
3153 protocol: "HTTP/1.1".to_string(),
3154 status,
3155 body_bytes: ctx.response_bytes,
3156 duration_ms: duration.as_millis() as u64,
3157 client_ip: ctx.client_ip.clone(),
3158 user_agent: ctx.user_agent.clone(),
3159 referer: ctx.referer.clone(),
3160 host: ctx.host.clone(),
3161 route_id: ctx.route_id.clone(),
3162 upstream: ctx.upstream.clone(),
3163 upstream_attempts: ctx.upstream_attempts,
3164 instance_id: self.app_state.instance_id.clone(),
3165 namespace: ctx.namespace.clone(),
3166 service: ctx.service.clone(),
3167 body_bytes_sent: ctx.response_bytes,
3169 upstream_addr: ctx.selected_upstream_address.clone(),
3170 connection_reused: ctx.connection_reused,
3171 rate_limit_hit: status == 429,
3172 geo_country: ctx.geo_country_code.clone(),
3173 };
3174 self.log_manager.log_access(&access_entry);
3175 }
3176
3177 if tracing::enabled!(tracing::Level::DEBUG) {
3179 debug!(
3180 trace_id = %ctx.trace_id,
3181 method = %ctx.method,
3182 path = %ctx.path,
3183 route_id = ?ctx.route_id,
3184 upstream = ?ctx.upstream,
3185 status = status,
3186 duration_ms = duration.as_millis() as u64,
3187 upstream_attempts = ctx.upstream_attempts,
3188 error = ?_error.map(|e| e.to_string()),
3189 "Request completed"
3190 );
3191 }
3192
3193 if ctx.is_websocket_upgrade && status == 101 {
3195 info!(
3196 trace_id = %ctx.trace_id,
3197 route_id = ?ctx.route_id,
3198 upstream = ?ctx.upstream,
3199 client_ip = %ctx.client_ip,
3200 "WebSocket connection established"
3201 );
3202 }
3203
3204 if let Some(span) = ctx.otel_span.take() {
3206 span.end();
3207 }
3208 }
3209}
3210
3211impl SentinelProxy {
3216 async fn process_body_chunk_streaming(
3218 &self,
3219 body: &mut Option<Bytes>,
3220 end_of_stream: bool,
3221 ctx: &mut RequestContext,
3222 ) -> Result<(), Box<Error>> {
3223 let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
3225 let chunk_index = ctx.request_body_chunk_index;
3226 ctx.request_body_chunk_index += 1;
3227 ctx.body_bytes_inspected += chunk_data.len() as u64;
3228
3229 debug!(
3230 correlation_id = %ctx.trace_id,
3231 chunk_index = chunk_index,
3232 chunk_size = chunk_data.len(),
3233 end_of_stream = end_of_stream,
3234 "Streaming body chunk to agents"
3235 );
3236
3237 let agent_ctx = crate::agents::AgentCallContext {
3239 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3240 metadata: sentinel_agent_protocol::RequestMetadata {
3241 correlation_id: ctx.trace_id.clone(),
3242 request_id: ctx.trace_id.clone(),
3243 client_ip: ctx.client_ip.clone(),
3244 client_port: 0,
3245 server_name: ctx.host.clone(),
3246 protocol: "HTTP/1.1".to_string(),
3247 tls_version: None,
3248 tls_cipher: None,
3249 route_id: ctx.route_id.clone(),
3250 upstream_id: ctx.upstream.clone(),
3251 timestamp: chrono::Utc::now().to_rfc3339(),
3252 traceparent: ctx.traceparent(),
3253 },
3254 route_id: ctx.route_id.clone(),
3255 upstream_id: ctx.upstream.clone(),
3256 request_body: None, response_body: None,
3258 };
3259
3260 let agent_ids = ctx.body_inspection_agents.clone();
3261 let total_size = None; match self
3264 .agent_manager
3265 .process_request_body_streaming(
3266 &agent_ctx,
3267 &chunk_data,
3268 end_of_stream,
3269 chunk_index,
3270 ctx.body_bytes_inspected as usize,
3271 total_size,
3272 &agent_ids,
3273 )
3274 .await
3275 {
3276 Ok(decision) => {
3277 ctx.agent_needs_more = decision.needs_more;
3279
3280 if let Some(ref mutation) = decision.request_body_mutation {
3282 if !mutation.is_pass_through() {
3283 if mutation.is_drop() {
3284 *body = None;
3286 trace!(
3287 correlation_id = %ctx.trace_id,
3288 chunk_index = chunk_index,
3289 "Agent dropped body chunk"
3290 );
3291 } else if let Some(ref new_data) = mutation.data {
3292 *body = Some(Bytes::from(new_data.clone()));
3294 trace!(
3295 correlation_id = %ctx.trace_id,
3296 chunk_index = chunk_index,
3297 original_size = chunk_data.len(),
3298 new_size = new_data.len(),
3299 "Agent mutated body chunk"
3300 );
3301 }
3302 }
3303 }
3304
3305 if !decision.needs_more && !decision.is_allow() {
3307 warn!(
3308 correlation_id = %ctx.trace_id,
3309 action = ?decision.action,
3310 "Agent blocked request body"
3311 );
3312 self.metrics.record_blocked_request("agent_body_inspection");
3313
3314 let (status, message) = match &decision.action {
3315 crate::agents::AgentAction::Block { status, body, .. } => (
3316 *status,
3317 body.clone().unwrap_or_else(|| "Blocked".to_string()),
3318 ),
3319 _ => (403, "Forbidden".to_string()),
3320 };
3321
3322 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3323 }
3324
3325 trace!(
3326 correlation_id = %ctx.trace_id,
3327 needs_more = decision.needs_more,
3328 "Agent processed body chunk"
3329 );
3330 }
3331 Err(e) => {
3332 let fail_closed = ctx
3333 .route_config
3334 .as_ref()
3335 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3336 .unwrap_or(false);
3337
3338 if fail_closed {
3339 error!(
3340 correlation_id = %ctx.trace_id,
3341 error = %e,
3342 "Agent streaming body inspection failed, blocking (fail-closed)"
3343 );
3344 return Err(Error::explain(
3345 ErrorType::HTTPStatus(503),
3346 "Service unavailable",
3347 ));
3348 } else {
3349 warn!(
3350 correlation_id = %ctx.trace_id,
3351 error = %e,
3352 "Agent streaming body inspection failed, allowing (fail-open)"
3353 );
3354 }
3355 }
3356 }
3357
3358 Ok(())
3359 }
3360
3361 async fn send_buffered_body_to_agents(
3363 &self,
3364 end_of_stream: bool,
3365 ctx: &mut RequestContext,
3366 ) -> Result<(), Box<Error>> {
3367 debug!(
3368 correlation_id = %ctx.trace_id,
3369 buffer_size = ctx.body_buffer.len(),
3370 end_of_stream = end_of_stream,
3371 agent_count = ctx.body_inspection_agents.len(),
3372 decompression_enabled = ctx.decompression_enabled,
3373 "Sending buffered body to agents for inspection"
3374 );
3375
3376 let body_for_inspection = if ctx.decompression_enabled {
3378 if let Some(ref encoding) = ctx.body_content_encoding {
3379 let config = crate::decompression::DecompressionConfig {
3380 max_ratio: ctx.max_decompression_ratio,
3381 max_output_bytes: ctx.max_decompression_bytes,
3382 };
3383
3384 match crate::decompression::decompress_body(
3385 &ctx.body_buffer,
3386 encoding,
3387 &config,
3388 ) {
3389 Ok(result) => {
3390 ctx.body_was_decompressed = true;
3391 self.metrics
3392 .record_decompression_success(encoding, result.ratio);
3393 debug!(
3394 correlation_id = %ctx.trace_id,
3395 encoding = %encoding,
3396 compressed_size = result.compressed_size,
3397 decompressed_size = result.decompressed_size,
3398 ratio = result.ratio,
3399 "Body decompressed for agent inspection"
3400 );
3401 result.data
3402 }
3403 Err(e) => {
3404 let failure_reason = match &e {
3406 crate::decompression::DecompressionError::RatioExceeded { .. } => {
3407 "ratio_exceeded"
3408 }
3409 crate::decompression::DecompressionError::SizeExceeded { .. } => {
3410 "size_exceeded"
3411 }
3412 crate::decompression::DecompressionError::InvalidData { .. } => {
3413 "invalid_data"
3414 }
3415 crate::decompression::DecompressionError::UnsupportedEncoding { .. } => {
3416 "unsupported"
3417 }
3418 crate::decompression::DecompressionError::IoError(_) => "io_error",
3419 };
3420 self.metrics
3421 .record_decompression_failure(encoding, failure_reason);
3422
3423 let fail_closed = ctx
3425 .route_config
3426 .as_ref()
3427 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3428 .unwrap_or(false);
3429
3430 if fail_closed {
3431 error!(
3432 correlation_id = %ctx.trace_id,
3433 error = %e,
3434 encoding = %encoding,
3435 "Decompression failed, blocking (fail-closed)"
3436 );
3437 return Err(Error::explain(
3438 ErrorType::HTTPStatus(400),
3439 "Invalid compressed body",
3440 ));
3441 } else {
3442 warn!(
3443 correlation_id = %ctx.trace_id,
3444 error = %e,
3445 encoding = %encoding,
3446 "Decompression failed, sending compressed body (fail-open)"
3447 );
3448 ctx.body_buffer.clone()
3449 }
3450 }
3451 }
3452 } else {
3453 ctx.body_buffer.clone()
3454 }
3455 } else {
3456 ctx.body_buffer.clone()
3457 };
3458
3459 let agent_ctx = crate::agents::AgentCallContext {
3460 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3461 metadata: sentinel_agent_protocol::RequestMetadata {
3462 correlation_id: ctx.trace_id.clone(),
3463 request_id: ctx.trace_id.clone(),
3464 client_ip: ctx.client_ip.clone(),
3465 client_port: 0,
3466 server_name: ctx.host.clone(),
3467 protocol: "HTTP/1.1".to_string(),
3468 tls_version: None,
3469 tls_cipher: None,
3470 route_id: ctx.route_id.clone(),
3471 upstream_id: ctx.upstream.clone(),
3472 timestamp: chrono::Utc::now().to_rfc3339(),
3473 traceparent: ctx.traceparent(),
3474 },
3475 route_id: ctx.route_id.clone(),
3476 upstream_id: ctx.upstream.clone(),
3477 request_body: Some(body_for_inspection.clone()),
3478 response_body: None,
3479 };
3480
3481 let agent_ids = ctx.body_inspection_agents.clone();
3482 match self
3483 .agent_manager
3484 .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
3485 .await
3486 {
3487 Ok(decision) => {
3488 if !decision.is_allow() {
3489 warn!(
3490 correlation_id = %ctx.trace_id,
3491 action = ?decision.action,
3492 "Agent blocked request body"
3493 );
3494 self.metrics.record_blocked_request("agent_body_inspection");
3495
3496 let (status, message) = match &decision.action {
3497 crate::agents::AgentAction::Block { status, body, .. } => (
3498 *status,
3499 body.clone().unwrap_or_else(|| "Blocked".to_string()),
3500 ),
3501 _ => (403, "Forbidden".to_string()),
3502 };
3503
3504 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3505 }
3506
3507 trace!(
3508 correlation_id = %ctx.trace_id,
3509 "Agent allowed request body"
3510 );
3511 }
3512 Err(e) => {
3513 let fail_closed = ctx
3514 .route_config
3515 .as_ref()
3516 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3517 .unwrap_or(false);
3518
3519 if fail_closed {
3520 error!(
3521 correlation_id = %ctx.trace_id,
3522 error = %e,
3523 "Agent body inspection failed, blocking (fail-closed)"
3524 );
3525 return Err(Error::explain(
3526 ErrorType::HTTPStatus(503),
3527 "Service unavailable",
3528 ));
3529 } else {
3530 warn!(
3531 correlation_id = %ctx.trace_id,
3532 error = %e,
3533 "Agent body inspection failed, allowing (fail-open)"
3534 );
3535 }
3536 }
3537 }
3538
3539 Ok(())
3540 }
3541}