1use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14 CacheKey, CacheMeta, ForcedFreshness, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::inference::{
23 extract_inference_content, is_sse_response, PromptInjectionResult, StreamingTokenCounter,
24};
25use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
26use crate::rate_limit::HeaderAccessor;
27use crate::routing::RequestInfo;
28
29use super::context::{FallbackReason, RequestContext};
30use super::fallback::FallbackEvaluator;
31use super::fallback_metrics::get_fallback_metrics;
32use super::model_routing;
33use super::model_routing_metrics::get_model_routing_metrics;
34use super::SentinelProxy;
35
36struct NoHeaderAccessor;
38impl HeaderAccessor for NoHeaderAccessor {
39 fn get_header(&self, _name: &str) -> Option<String> {
40 None
41 }
42}
43
44#[async_trait]
45impl ProxyHttp for SentinelProxy {
46 type CTX = RequestContext;
47
48 fn new_ctx(&self) -> Self::CTX {
49 RequestContext::new()
50 }
51
52 fn fail_to_connect(
53 &self,
54 _session: &mut Session,
55 peer: &HttpPeer,
56 ctx: &mut Self::CTX,
57 e: Box<Error>,
58 ) -> Box<Error> {
59 error!(
60 correlation_id = %ctx.trace_id,
61 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
62 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
63 peer_address = %peer.address(),
64 error = %e,
65 "Failed to connect to upstream peer"
66 );
67 e
69 }
70
71 async fn early_request_filter(
74 &self,
75 session: &mut Session,
76 ctx: &mut Self::CTX,
77 ) -> Result<(), Box<Error>> {
78 let req_header = session.req_header();
80 let method = req_header.method.as_str();
81 let path = req_header.uri.path();
82 let host = req_header
83 .headers
84 .get("host")
85 .and_then(|h| h.to_str().ok())
86 .unwrap_or("");
87
88 if let Some(ref challenge_manager) = self.acme_challenges {
90 if let Some(token) = crate::acme::ChallengeManager::extract_token(path) {
91 if let Some(key_authorization) = challenge_manager.get_response(token) {
92 debug!(
93 token = %token,
94 "Serving ACME HTTP-01 challenge response"
95 );
96
97 let mut resp = ResponseHeader::build(200, None)?;
99 resp.insert_header("Content-Type", "text/plain")?;
100 resp.insert_header("Content-Length", key_authorization.len().to_string())?;
101
102 session.write_response_header(Box::new(resp), false).await?;
104 session
105 .write_response_body(Some(Bytes::from(key_authorization)), true)
106 .await?;
107
108 return Err(Error::explain(
110 ErrorType::InternalError,
111 "ACME challenge served",
112 ));
113 } else {
114 warn!(
116 token = %token,
117 "ACME challenge token not found"
118 );
119 }
120 }
121 }
122
123 ctx.method = method.to_string();
124 ctx.path = path.to_string();
125 ctx.host = Some(host.to_string());
126
127 let route_match = {
129 let route_matcher = self.route_matcher.read();
130 let request_info = RequestInfo::new(method, path, host);
131 match route_matcher.match_request(&request_info) {
132 Some(m) => m,
133 None => return Ok(()), }
135 };
136
137 ctx.trace_id = self.get_trace_id(session);
138 ctx.route_id = Some(route_match.route_id.to_string());
139 ctx.route_config = Some(route_match.config.clone());
140
141 if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
143 if let Ok(s) = traceparent.to_str() {
144 ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
145 }
146 }
147
148 if let Some(tracer) = crate::otel::get_tracer() {
150 ctx.otel_span = Some(tracer.start_span(method, path, ctx.trace_context.as_ref()));
151 }
152
153 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
155 trace!(
156 correlation_id = %ctx.trace_id,
157 route_id = %route_match.route_id,
158 builtin_handler = ?route_match.config.builtin_handler,
159 "Handling builtin route in early_request_filter"
160 );
161
162 let handled = self
164 .handle_builtin_route(session, ctx, &route_match)
165 .await?;
166
167 if handled {
168 return Err(Error::explain(
170 ErrorType::InternalError,
171 "Builtin handler complete",
172 ));
173 }
174 }
175
176 Ok(())
177 }
178
179 async fn upstream_peer(
180 &self,
181 session: &mut Session,
182 ctx: &mut Self::CTX,
183 ) -> Result<Box<HttpPeer>, Box<Error>> {
184 self.reload_coordinator.inc_requests();
186
187 if ctx.config.is_none() {
189 ctx.config = Some(self.config_manager.current());
190 }
191
192 if ctx.client_ip.is_empty() {
194 ctx.client_ip = session
195 .client_addr()
196 .map(|a| a.to_string())
197 .unwrap_or_else(|| "unknown".to_string());
198 }
199
200 let req_header = session.req_header();
201
202 if ctx.method.is_empty() {
204 ctx.method = req_header.method.to_string();
205 ctx.path = req_header.uri.path().to_string();
206 ctx.query = req_header.uri.query().map(|q| q.to_string());
207 ctx.host = req_header
208 .headers
209 .get("host")
210 .and_then(|v| v.to_str().ok())
211 .map(|s| s.to_string());
212 }
213 ctx.user_agent = req_header
214 .headers
215 .get("user-agent")
216 .and_then(|v| v.to_str().ok())
217 .map(|s| s.to_string());
218 ctx.referer = req_header
219 .headers
220 .get("referer")
221 .and_then(|v| v.to_str().ok())
222 .map(|s| s.to_string());
223
224 trace!(
225 correlation_id = %ctx.trace_id,
226 client_ip = %ctx.client_ip,
227 "Request received, initializing context"
228 );
229
230 let route_match = if let Some(ref route_config) = ctx.route_config {
232 let route_id = ctx.route_id.as_deref().unwrap_or("");
233 crate::routing::RouteMatch {
234 route_id: sentinel_common::RouteId::new(route_id),
235 config: route_config.clone(),
236 }
237 } else {
238 let (match_result, route_duration) = {
240 let route_matcher = self.route_matcher.read();
241 let host = ctx.host.as_deref().unwrap_or("");
242
243 let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
245
246 if route_matcher.needs_headers() {
248 request_info = request_info
249 .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
250 }
251
252 if route_matcher.needs_query_params() {
254 request_info =
255 request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
256 }
257
258 trace!(
259 correlation_id = %ctx.trace_id,
260 method = %request_info.method,
261 path = %request_info.path,
262 host = %request_info.host,
263 "Built request info for route matching"
264 );
265
266 let route_start = std::time::Instant::now();
267 let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
268 warn!(
269 correlation_id = %ctx.trace_id,
270 method = %request_info.method,
271 path = %request_info.path,
272 host = %request_info.host,
273 "No matching route found for request"
274 );
275 Error::explain(ErrorType::InternalError, "No matching route found")
276 })?;
277 let route_duration = route_start.elapsed();
278 (route_match, route_duration)
280 };
281
282 ctx.route_id = Some(match_result.route_id.to_string());
283 ctx.route_config = Some(match_result.config.clone());
284
285 if ctx.trace_id.is_empty() {
287 ctx.trace_id = self.get_trace_id(session);
288
289 if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
291 if let Ok(s) = traceparent.to_str() {
292 ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
293 }
294 }
295
296 if let Some(tracer) = crate::otel::get_tracer() {
298 ctx.otel_span =
299 Some(tracer.start_span(&ctx.method, &ctx.path, ctx.trace_context.as_ref()));
300 }
301 }
302
303 trace!(
304 correlation_id = %ctx.trace_id,
305 route_id = %match_result.route_id,
306 route_duration_us = route_duration.as_micros(),
307 service_type = ?match_result.config.service_type,
308 "Route matched"
309 );
310 match_result
311 };
312
313 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
315 trace!(
316 correlation_id = %ctx.trace_id,
317 route_id = %route_match.route_id,
318 builtin_handler = ?route_match.config.builtin_handler,
319 "Route type is builtin, skipping upstream"
320 );
321 ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
323 return Err(Error::explain(
325 ErrorType::InternalError,
326 "Builtin handler handled in request_filter",
327 ));
328 }
329
330 if route_match.config.service_type == sentinel_config::ServiceType::Static {
332 trace!(
333 correlation_id = %ctx.trace_id,
334 route_id = %route_match.route_id,
335 "Route type is static, checking for static server"
336 );
337 if self
339 .static_servers
340 .get(route_match.route_id.as_str())
341 .await
342 .is_some()
343 {
344 ctx.upstream = Some(format!("_static_{}", route_match.route_id));
346 info!(
347 correlation_id = %ctx.trace_id,
348 route_id = %route_match.route_id,
349 path = %ctx.path,
350 "Serving static file"
351 );
352 return Err(Error::explain(
354 ErrorType::InternalError,
355 "Static file serving handled in request_filter",
356 ));
357 }
358 }
359
360 let mut model_routing_applied = false;
363 if let Some(ref inference) = route_match.config.inference {
364 if let Some(ref model_routing) = inference.model_routing {
365 let model = model_routing::extract_model_from_headers(&req_header.headers);
367
368 if let Some(ref model_name) = model {
369 if let Some(routing_result) =
371 model_routing::find_upstream_for_model(model_routing, model_name)
372 {
373 debug!(
374 correlation_id = %ctx.trace_id,
375 route_id = %route_match.route_id,
376 model = %model_name,
377 upstream = %routing_result.upstream,
378 is_default = routing_result.is_default,
379 provider_override = ?routing_result.provider,
380 "Model-based routing selected upstream"
381 );
382
383 ctx.record_model_routing(
384 &routing_result.upstream,
385 Some(model_name.clone()),
386 routing_result.provider,
387 );
388 model_routing_applied = true;
389
390 if let Some(metrics) = get_model_routing_metrics() {
392 metrics.record_model_routed(
393 route_match.route_id.as_str(),
394 model_name,
395 &routing_result.upstream,
396 );
397 if routing_result.is_default {
398 metrics.record_default_upstream(route_match.route_id.as_str());
399 }
400 if let Some(provider) = routing_result.provider {
401 metrics.record_provider_override(
402 route_match.route_id.as_str(),
403 &routing_result.upstream,
404 provider.as_str(),
405 );
406 }
407 }
408 }
409 } else if let Some(ref default_upstream) = model_routing.default_upstream {
410 debug!(
412 correlation_id = %ctx.trace_id,
413 route_id = %route_match.route_id,
414 upstream = %default_upstream,
415 "Model-based routing using default upstream (no model header)"
416 );
417 ctx.record_model_routing(default_upstream, None, None);
418 model_routing_applied = true;
419
420 if let Some(metrics) = get_model_routing_metrics() {
422 metrics.record_no_model_header(route_match.route_id.as_str());
423 }
424 }
425 }
426 }
427
428 if !model_routing_applied {
430 if let Some(ref upstream) = route_match.config.upstream {
431 ctx.upstream = Some(upstream.clone());
432 trace!(
433 correlation_id = %ctx.trace_id,
434 route_id = %route_match.route_id,
435 upstream = %upstream,
436 "Upstream configured for route"
437 );
438 } else {
439 error!(
440 correlation_id = %ctx.trace_id,
441 route_id = %route_match.route_id,
442 "Route has no upstream configured"
443 );
444 return Err(Error::explain(
445 ErrorType::InternalError,
446 format!(
447 "Route '{}' has no upstream configured",
448 route_match.route_id
449 ),
450 ));
451 }
452 }
453
454 if let Some(ref fallback_config) = route_match.config.fallback {
457 let upstream_name = ctx.upstream.as_ref().unwrap();
458
459 let is_healthy = if let Some(pool) = self.upstream_pools.get(upstream_name).await {
461 pool.has_healthy_targets().await
462 } else {
463 false };
465
466 let is_budget_exhausted = ctx.inference_budget_exhausted;
468
469 let current_model = ctx.inference_model.as_deref();
471
472 let evaluator = FallbackEvaluator::new(
474 fallback_config,
475 ctx.tried_upstreams(),
476 ctx.fallback_attempt,
477 );
478
479 if let Some(decision) = evaluator.should_fallback_before_request(
481 upstream_name,
482 is_healthy,
483 is_budget_exhausted,
484 current_model,
485 ) {
486 info!(
487 correlation_id = %ctx.trace_id,
488 route_id = %route_match.route_id,
489 from_upstream = %upstream_name,
490 to_upstream = %decision.next_upstream,
491 reason = %decision.reason,
492 fallback_attempt = ctx.fallback_attempt + 1,
493 "Triggering fallback routing"
494 );
495
496 if let Some(metrics) = get_fallback_metrics() {
498 metrics.record_fallback_attempt(
499 route_match.route_id.as_str(),
500 upstream_name,
501 &decision.next_upstream,
502 &decision.reason,
503 );
504 }
505
506 ctx.record_fallback(decision.reason, &decision.next_upstream);
508
509 if let Some((original, mapped)) = decision.model_mapping {
511 if let Some(metrics) = get_fallback_metrics() {
513 metrics.record_model_mapping(
514 route_match.route_id.as_str(),
515 &original,
516 &mapped,
517 );
518 }
519
520 ctx.record_model_mapping(original, mapped);
521 trace!(
522 correlation_id = %ctx.trace_id,
523 original_model = ?ctx.model_mapping_applied().map(|m| &m.0),
524 mapped_model = ?ctx.model_mapping_applied().map(|m| &m.1),
525 "Applied model mapping for fallback"
526 );
527 }
528 }
529 }
530
531 debug!(
532 correlation_id = %ctx.trace_id,
533 route_id = %route_match.route_id,
534 upstream = ?ctx.upstream,
535 method = %req_header.method,
536 path = %req_header.uri.path(),
537 host = ctx.host.as_deref().unwrap_or("-"),
538 client_ip = %ctx.client_ip,
539 "Processing request"
540 );
541
542 if ctx
544 .upstream
545 .as_ref()
546 .is_some_and(|u| u.starts_with("_static_"))
547 {
548 return Err(Error::explain(
550 ErrorType::InternalError,
551 "Static route should be handled in request_filter",
552 ));
553 }
554
555 let upstream_name = ctx
556 .upstream
557 .as_ref()
558 .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
559
560 trace!(
561 correlation_id = %ctx.trace_id,
562 upstream = %upstream_name,
563 "Looking up upstream pool"
564 );
565
566 let pool = self
567 .upstream_pools
568 .get(upstream_name)
569 .await
570 .ok_or_else(|| {
571 error!(
572 correlation_id = %ctx.trace_id,
573 upstream = %upstream_name,
574 "Upstream pool not found"
575 );
576 Error::explain(
577 ErrorType::InternalError,
578 format!("Upstream pool '{}' not found", upstream_name),
579 )
580 })?;
581
582 let max_retries = route_match
584 .config
585 .retry_policy
586 .as_ref()
587 .map(|r| r.max_attempts)
588 .unwrap_or(1);
589
590 trace!(
591 correlation_id = %ctx.trace_id,
592 upstream = %upstream_name,
593 max_retries = max_retries,
594 "Starting upstream peer selection"
595 );
596
597 let mut last_error = None;
598 let selection_start = std::time::Instant::now();
599
600 for attempt in 1..=max_retries {
601 ctx.upstream_attempts = attempt;
602
603 trace!(
604 correlation_id = %ctx.trace_id,
605 upstream = %upstream_name,
606 attempt = attempt,
607 max_retries = max_retries,
608 "Attempting to select upstream peer"
609 );
610
611 match pool.select_peer_with_metadata(None).await {
612 Ok((peer, metadata)) => {
613 let selection_duration = selection_start.elapsed();
614 let peer_addr = peer.address().to_string();
616 ctx.selected_upstream_address = Some(peer_addr.clone());
617
618 if metadata.contains_key("sticky_session_new") {
620 ctx.sticky_session_new_assignment = true;
621 ctx.sticky_session_set_cookie =
622 metadata.get("sticky_set_cookie_header").cloned();
623 ctx.sticky_target_index = metadata
624 .get("sticky_target_index")
625 .and_then(|s| s.parse().ok());
626
627 trace!(
628 correlation_id = %ctx.trace_id,
629 sticky_target_index = ?ctx.sticky_target_index,
630 "New sticky session assignment, will set cookie"
631 );
632 }
633
634 debug!(
635 correlation_id = %ctx.trace_id,
636 upstream = %upstream_name,
637 peer_address = %peer_addr,
638 attempt = attempt,
639 selection_duration_us = selection_duration.as_micros(),
640 sticky_session_hit = metadata.contains_key("sticky_session_hit"),
641 sticky_session_new = ctx.sticky_session_new_assignment,
642 "Selected upstream peer"
643 );
644 return Ok(Box::new(peer));
645 }
646 Err(e) => {
647 warn!(
648 correlation_id = %ctx.trace_id,
649 upstream = %upstream_name,
650 attempt = attempt,
651 max_retries = max_retries,
652 error = %e,
653 "Failed to select upstream peer"
654 );
655 last_error = Some(e);
656
657 if attempt < max_retries {
658 let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
660 trace!(
661 correlation_id = %ctx.trace_id,
662 backoff_ms = backoff.as_millis(),
663 "Backing off before retry"
664 );
665 sleep(backoff).await;
666 }
667 }
668 }
669 }
670
671 let selection_duration = selection_start.elapsed();
672 error!(
673 correlation_id = %ctx.trace_id,
674 upstream = %upstream_name,
675 attempts = max_retries,
676 selection_duration_ms = selection_duration.as_millis(),
677 last_error = ?last_error,
678 "All upstream selection attempts failed"
679 );
680
681 if ctx.used_fallback() {
683 if let Some(metrics) = get_fallback_metrics() {
684 metrics.record_fallback_exhausted(ctx.route_id.as_deref().unwrap_or("unknown"));
685 }
686 }
687
688 Err(Error::explain(
689 ErrorType::InternalError,
690 format!("All upstream attempts failed: {:?}", last_error),
691 ))
692 }
693
694 async fn request_filter(
695 &self,
696 session: &mut Session,
697 ctx: &mut Self::CTX,
698 ) -> Result<bool, Box<Error>> {
699 trace!(
700 correlation_id = %ctx.trace_id,
701 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
702 "Starting request filter phase"
703 );
704
705 if let Some(route_id) = ctx.route_id.as_deref() {
708 if self.rate_limit_manager.has_route_limiter(route_id) {
709 let rate_result = self.rate_limit_manager.check(
710 route_id,
711 &ctx.client_ip,
712 &ctx.path,
713 Option::<&NoHeaderAccessor>::None,
714 );
715
716 if rate_result.limit > 0 {
718 ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
719 limit: rate_result.limit,
720 remaining: rate_result.remaining,
721 reset_at: rate_result.reset_at,
722 });
723 }
724
725 if !rate_result.allowed {
726 use sentinel_config::RateLimitAction;
727
728 match rate_result.action {
729 RateLimitAction::Reject => {
730 warn!(
731 correlation_id = %ctx.trace_id,
732 route_id = route_id,
733 client_ip = %ctx.client_ip,
734 limiter = %rate_result.limiter,
735 limit = rate_result.limit,
736 remaining = rate_result.remaining,
737 "Request rate limited"
738 );
739 self.metrics.record_blocked_request("rate_limited");
740
741 let audit_entry = AuditLogEntry::rate_limited(
743 &ctx.trace_id,
744 &ctx.method,
745 &ctx.path,
746 &ctx.client_ip,
747 &rate_result.limiter,
748 )
749 .with_route_id(route_id)
750 .with_status_code(rate_result.status_code);
751 self.log_manager.log_audit(&audit_entry);
752
753 let body = rate_result
755 .message
756 .unwrap_or_else(|| "Rate limit exceeded".to_string());
757
758 let retry_after = rate_result.reset_at.saturating_sub(
760 std::time::SystemTime::now()
761 .duration_since(std::time::UNIX_EPOCH)
762 .unwrap_or_default()
763 .as_secs(),
764 );
765 crate::http_helpers::write_rate_limit_error(
766 session,
767 rate_result.status_code,
768 &body,
769 rate_result.limit,
770 rate_result.remaining,
771 rate_result.reset_at,
772 retry_after,
773 )
774 .await?;
775 return Ok(true); }
777 RateLimitAction::LogOnly => {
778 debug!(
779 correlation_id = %ctx.trace_id,
780 route_id = route_id,
781 "Rate limit exceeded (log only mode)"
782 );
783 }
785 RateLimitAction::Delay => {
786 if let Some(delay_ms) = rate_result.suggested_delay_ms {
788 let actual_delay = delay_ms.min(rate_result.max_delay_ms);
790
791 if actual_delay > 0 {
792 debug!(
793 correlation_id = %ctx.trace_id,
794 route_id = route_id,
795 suggested_delay_ms = delay_ms,
796 max_delay_ms = rate_result.max_delay_ms,
797 actual_delay_ms = actual_delay,
798 "Applying rate limit delay"
799 );
800
801 tokio::time::sleep(std::time::Duration::from_millis(
802 actual_delay,
803 ))
804 .await;
805 }
806 }
807 }
809 }
810 }
811 }
812 }
813
814 if let Some(route_id) = ctx.route_id.as_deref() {
817 if let Some(ref route_config) = ctx.route_config {
818 if route_config.service_type == sentinel_config::ServiceType::Inference
819 && self.inference_rate_limit_manager.has_route(route_id)
820 {
821 let headers = &session.req_header().headers;
824
825 let body = ctx.body_buffer.as_slice();
827
828 let rate_limit_key = &ctx.client_ip;
830
831 if let Some(check_result) = self.inference_rate_limit_manager.check(
832 route_id,
833 rate_limit_key,
834 headers,
835 body,
836 ) {
837 ctx.inference_rate_limit_enabled = true;
839 ctx.inference_estimated_tokens = check_result.estimated_tokens;
840 ctx.inference_rate_limit_key = Some(rate_limit_key.to_string());
841 ctx.inference_model = check_result.model.clone();
842
843 if !check_result.is_allowed() {
844 let retry_after_ms = check_result.retry_after_ms();
845 let retry_after_secs = retry_after_ms.div_ceil(1000);
846
847 warn!(
848 correlation_id = %ctx.trace_id,
849 route_id = route_id,
850 client_ip = %ctx.client_ip,
851 estimated_tokens = check_result.estimated_tokens,
852 model = ?check_result.model,
853 retry_after_ms = retry_after_ms,
854 "Inference rate limit exceeded (tokens)"
855 );
856 self.metrics
857 .record_blocked_request("inference_rate_limited");
858
859 let audit_entry = AuditLogEntry::new(
861 &ctx.trace_id,
862 AuditEventType::RateLimitExceeded,
863 &ctx.method,
864 &ctx.path,
865 &ctx.client_ip,
866 )
867 .with_route_id(route_id)
868 .with_status_code(429)
869 .with_reason(format!(
870 "Token rate limit exceeded: estimated {} tokens, model={:?}",
871 check_result.estimated_tokens, check_result.model
872 ));
873 self.log_manager.log_audit(&audit_entry);
874
875 let body = "Token rate limit exceeded";
877 let reset_at = std::time::SystemTime::now()
878 .duration_since(std::time::UNIX_EPOCH)
879 .unwrap_or_default()
880 .as_secs()
881 + retry_after_secs;
882
883 crate::http_helpers::write_rate_limit_error(
885 session,
886 429,
887 body,
888 0, 0, reset_at,
891 retry_after_secs,
892 )
893 .await?;
894 return Ok(true); }
896
897 trace!(
898 correlation_id = %ctx.trace_id,
899 route_id = route_id,
900 estimated_tokens = check_result.estimated_tokens,
901 model = ?check_result.model,
902 "Inference rate limit check passed"
903 );
904
905 if self.inference_rate_limit_manager.has_budget(route_id) {
907 ctx.inference_budget_enabled = true;
908
909 if let Some(budget_result) =
910 self.inference_rate_limit_manager.check_budget(
911 route_id,
912 rate_limit_key,
913 check_result.estimated_tokens,
914 )
915 {
916 if !budget_result.is_allowed() {
917 let retry_after_secs = budget_result.retry_after_secs();
918
919 warn!(
920 correlation_id = %ctx.trace_id,
921 route_id = route_id,
922 client_ip = %ctx.client_ip,
923 estimated_tokens = check_result.estimated_tokens,
924 retry_after_secs = retry_after_secs,
925 "Token budget exhausted"
926 );
927
928 ctx.inference_budget_exhausted = true;
929 self.metrics.record_blocked_request("budget_exhausted");
930
931 let audit_entry = AuditLogEntry::new(
933 &ctx.trace_id,
934 AuditEventType::RateLimitExceeded,
935 &ctx.method,
936 &ctx.path,
937 &ctx.client_ip,
938 )
939 .with_route_id(route_id)
940 .with_status_code(429)
941 .with_reason("Token budget exhausted".to_string());
942 self.log_manager.log_audit(&audit_entry);
943
944 let body = "Token budget exhausted";
946 let reset_at = std::time::SystemTime::now()
947 .duration_since(std::time::UNIX_EPOCH)
948 .unwrap_or_default()
949 .as_secs()
950 + retry_after_secs;
951
952 crate::http_helpers::write_rate_limit_error(
953 session,
954 429,
955 body,
956 0,
957 0,
958 reset_at,
959 retry_after_secs,
960 )
961 .await?;
962 return Ok(true);
963 }
964
965 let remaining = match &budget_result {
967 sentinel_common::budget::BudgetCheckResult::Allowed {
968 remaining,
969 } => *remaining as i64,
970 sentinel_common::budget::BudgetCheckResult::Soft {
971 remaining,
972 ..
973 } => *remaining,
974 _ => 0,
975 };
976 ctx.inference_budget_remaining = Some(remaining);
977
978 if let Some(status) = self
980 .inference_rate_limit_manager
981 .budget_status(route_id, rate_limit_key)
982 {
983 ctx.inference_budget_period_reset = Some(status.period_end);
984 }
985
986 trace!(
987 correlation_id = %ctx.trace_id,
988 route_id = route_id,
989 budget_remaining = remaining,
990 "Token budget check passed"
991 );
992 }
993 }
994
995 if self
997 .inference_rate_limit_manager
998 .has_cost_attribution(route_id)
999 {
1000 ctx.inference_cost_enabled = true;
1001 }
1002 }
1003 }
1004 }
1005 }
1006
1007 if let Some(ref route_config) = ctx.route_config {
1009 if let Some(ref inference) = route_config.inference {
1010 if let Some(ref guardrails) = inference.guardrails {
1011 if let Some(ref pi_config) = guardrails.prompt_injection {
1012 if pi_config.enabled && !ctx.body_buffer.is_empty() {
1013 ctx.guardrails_enabled = true;
1014
1015 if let Some(content) = extract_inference_content(&ctx.body_buffer) {
1017 let result = self
1018 .guardrail_processor
1019 .check_prompt_injection(
1020 pi_config,
1021 &content,
1022 ctx.inference_model.as_deref(),
1023 ctx.route_id.as_deref(),
1024 &ctx.trace_id,
1025 )
1026 .await;
1027
1028 match result {
1029 PromptInjectionResult::Blocked {
1030 status,
1031 message,
1032 detections,
1033 } => {
1034 warn!(
1035 correlation_id = %ctx.trace_id,
1036 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1037 detection_count = detections.len(),
1038 "Prompt injection detected, blocking request"
1039 );
1040
1041 self.metrics.record_blocked_request("prompt_injection");
1042
1043 ctx.guardrail_detection_categories =
1045 detections.iter().map(|d| d.category.clone()).collect();
1046
1047 let audit_entry = AuditLogEntry::new(
1049 &ctx.trace_id,
1050 AuditEventType::Blocked,
1051 &ctx.method,
1052 &ctx.path,
1053 &ctx.client_ip,
1054 )
1055 .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
1056 .with_status_code(status)
1057 .with_reason("Prompt injection detected".to_string());
1058 self.log_manager.log_audit(&audit_entry);
1059
1060 crate::http_helpers::write_json_error(
1062 session,
1063 status,
1064 "prompt_injection_blocked",
1065 Some(&message),
1066 )
1067 .await?;
1068 return Ok(true);
1069 }
1070 PromptInjectionResult::Detected { detections } => {
1071 warn!(
1073 correlation_id = %ctx.trace_id,
1074 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1075 detection_count = detections.len(),
1076 "Prompt injection detected (logged only)"
1077 );
1078 ctx.guardrail_detection_categories =
1079 detections.iter().map(|d| d.category.clone()).collect();
1080 }
1081 PromptInjectionResult::Warning { detections } => {
1082 ctx.guardrail_warning = true;
1084 ctx.guardrail_detection_categories =
1085 detections.iter().map(|d| d.category.clone()).collect();
1086 debug!(
1087 correlation_id = %ctx.trace_id,
1088 "Prompt injection warning set"
1089 );
1090 }
1091 PromptInjectionResult::Clean => {
1092 trace!(
1093 correlation_id = %ctx.trace_id,
1094 "No prompt injection detected"
1095 );
1096 }
1097 PromptInjectionResult::Error { message } => {
1098 trace!(
1100 correlation_id = %ctx.trace_id,
1101 error = %message,
1102 "Prompt injection check error (failure mode applied)"
1103 );
1104 }
1105 }
1106 }
1107 }
1108 }
1109 }
1110 }
1111 }
1112
1113 if let Some(route_id) = ctx.route_id.as_deref() {
1115 if let Some(ref route_config) = ctx.route_config {
1116 for filter_id in &route_config.filters {
1117 if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
1118 ctx.geo_country_code = result.country_code.clone();
1120 ctx.geo_lookup_performed = true;
1121
1122 if !result.allowed {
1123 warn!(
1124 correlation_id = %ctx.trace_id,
1125 route_id = route_id,
1126 client_ip = %ctx.client_ip,
1127 country = ?result.country_code,
1128 filter_id = %filter_id,
1129 "Request blocked by geo filter"
1130 );
1131 self.metrics.record_blocked_request("geo_blocked");
1132
1133 let audit_entry = AuditLogEntry::new(
1135 &ctx.trace_id,
1136 AuditEventType::Blocked,
1137 &ctx.method,
1138 &ctx.path,
1139 &ctx.client_ip,
1140 )
1141 .with_route_id(route_id)
1142 .with_status_code(result.status_code)
1143 .with_reason(format!(
1144 "Geo blocked: country={}, filter={}",
1145 result.country_code.as_deref().unwrap_or("unknown"),
1146 filter_id
1147 ));
1148 self.log_manager.log_audit(&audit_entry);
1149
1150 let body = result
1152 .block_message
1153 .unwrap_or_else(|| "Access denied".to_string());
1154
1155 crate::http_helpers::write_error(
1156 session,
1157 result.status_code,
1158 &body,
1159 "text/plain",
1160 )
1161 .await?;
1162 return Ok(true); }
1164
1165 break;
1167 }
1168 }
1169 }
1170 }
1171
1172 let is_websocket_upgrade = session
1174 .req_header()
1175 .headers
1176 .get(http::header::UPGRADE)
1177 .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
1178 .unwrap_or(false);
1179
1180 if is_websocket_upgrade {
1181 ctx.is_websocket_upgrade = true;
1182
1183 if let Some(ref route_config) = ctx.route_config {
1185 if !route_config.websocket {
1186 warn!(
1187 correlation_id = %ctx.trace_id,
1188 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1189 client_ip = %ctx.client_ip,
1190 "WebSocket upgrade rejected: not enabled for route"
1191 );
1192
1193 self.metrics.record_blocked_request("websocket_not_enabled");
1194
1195 let audit_entry = AuditLogEntry::new(
1197 &ctx.trace_id,
1198 AuditEventType::Blocked,
1199 &ctx.method,
1200 &ctx.path,
1201 &ctx.client_ip,
1202 )
1203 .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
1204 .with_action("websocket_rejected")
1205 .with_reason("WebSocket not enabled for route");
1206 self.log_manager.log_audit(&audit_entry);
1207
1208 crate::http_helpers::write_error(
1210 session,
1211 403,
1212 "WebSocket not enabled for this route",
1213 "text/plain",
1214 )
1215 .await?;
1216 return Ok(true); }
1218
1219 debug!(
1220 correlation_id = %ctx.trace_id,
1221 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1222 "WebSocket upgrade request allowed"
1223 );
1224
1225 if route_config.websocket_inspection {
1227 let has_compression = session
1229 .req_header()
1230 .headers
1231 .get("Sec-WebSocket-Extensions")
1232 .and_then(|v| v.to_str().ok())
1233 .map(|s| s.contains("permessage-deflate"))
1234 .unwrap_or(false);
1235
1236 if has_compression {
1237 debug!(
1238 correlation_id = %ctx.trace_id,
1239 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1240 "WebSocket inspection skipped: permessage-deflate negotiated"
1241 );
1242 ctx.websocket_skip_inspection = true;
1243 } else {
1244 ctx.websocket_inspection_enabled = true;
1245
1246 ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
1248 sentinel_agent_protocol::EventType::WebSocketFrame,
1249 );
1250
1251 debug!(
1252 correlation_id = %ctx.trace_id,
1253 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1254 agent_count = ctx.websocket_inspection_agents.len(),
1255 "WebSocket frame inspection enabled"
1256 );
1257 }
1258 }
1259 }
1260 }
1261
1262 if let Some(route_config) = ctx.route_config.clone() {
1265 if route_config.service_type == sentinel_config::ServiceType::Static {
1266 trace!(
1267 correlation_id = %ctx.trace_id,
1268 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1269 "Handling static file route"
1270 );
1271 let route_match = crate::routing::RouteMatch {
1273 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1274 config: route_config.clone(),
1275 };
1276 return self.handle_static_route(session, ctx, &route_match).await;
1277 } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
1278 trace!(
1279 correlation_id = %ctx.trace_id,
1280 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1281 builtin_handler = ?route_config.builtin_handler,
1282 "Handling builtin route"
1283 );
1284 let route_match = crate::routing::RouteMatch {
1286 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1287 config: route_config.clone(),
1288 };
1289 return self.handle_builtin_route(session, ctx, &route_match).await;
1290 }
1291 }
1292
1293 if let Some(route_id) = ctx.route_id.clone() {
1295 if let Some(validator) = self.validators.get(&route_id).await {
1296 trace!(
1297 correlation_id = %ctx.trace_id,
1298 route_id = %route_id,
1299 "Running API schema validation"
1300 );
1301 if let Some(result) = self
1302 .validate_api_request(session, ctx, &route_id, &validator)
1303 .await?
1304 {
1305 debug!(
1306 correlation_id = %ctx.trace_id,
1307 route_id = %route_id,
1308 validation_passed = result,
1309 "API validation complete"
1310 );
1311 return Ok(result);
1312 }
1313 }
1314 }
1315
1316 let client_addr = session
1318 .client_addr()
1319 .map(|a| format!("{}", a))
1320 .unwrap_or_else(|| "unknown".to_string());
1321 let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
1322
1323 let req_header = session.req_header_mut();
1324
1325 req_header
1327 .insert_header("X-Correlation-Id", &ctx.trace_id)
1328 .ok();
1329 req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
1330
1331 let config = ctx
1333 .config
1334 .get_or_insert_with(|| self.config_manager.current());
1335
1336 const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; let header_count = req_header.headers.len();
1341 if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
1342 && header_count > config.limits.max_header_count
1343 {
1344 warn!(
1345 correlation_id = %ctx.trace_id,
1346 header_count = header_count,
1347 limit = config.limits.max_header_count,
1348 "Request blocked: exceeds header count limit"
1349 );
1350
1351 self.metrics.record_blocked_request("header_count_exceeded");
1352 return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
1353 }
1354
1355 if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
1357 let total_header_size: usize = req_header
1358 .headers
1359 .iter()
1360 .map(|(k, v)| k.as_str().len() + v.len())
1361 .sum();
1362
1363 if total_header_size > config.limits.max_header_size_bytes {
1364 warn!(
1365 correlation_id = %ctx.trace_id,
1366 header_size = total_header_size,
1367 limit = config.limits.max_header_size_bytes,
1368 "Request blocked: exceeds header size limit"
1369 );
1370
1371 self.metrics.record_blocked_request("header_size_exceeded");
1372 return Err(Error::explain(
1373 ErrorType::InternalError,
1374 "Headers too large",
1375 ));
1376 }
1377 }
1378
1379 trace!(
1381 correlation_id = %ctx.trace_id,
1382 "Processing request through agents"
1383 );
1384 if let Err(e) = self
1385 .process_agents(session, ctx, &client_addr, client_port)
1386 .await
1387 {
1388 if let ErrorType::HTTPStatus(status) = e.etype() {
1391 let error_msg = e.to_string();
1393 let body = error_msg
1394 .split("context:")
1395 .nth(1)
1396 .map(|s| s.trim())
1397 .unwrap_or("Request blocked");
1398 debug!(
1399 correlation_id = %ctx.trace_id,
1400 status = status,
1401 body = %body,
1402 "Sending HTTP error response for agent block"
1403 );
1404 crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
1405 return Ok(true); }
1407 return Err(e);
1409 }
1410
1411 trace!(
1412 correlation_id = %ctx.trace_id,
1413 "Request filter phase complete, forwarding to upstream"
1414 );
1415
1416 Ok(false) }
1418
1419 async fn request_body_filter(
1426 &self,
1427 _session: &mut Session,
1428 body: &mut Option<Bytes>,
1429 end_of_stream: bool,
1430 ctx: &mut Self::CTX,
1431 ) -> Result<(), Box<Error>> {
1432 use sentinel_config::BodyStreamingMode;
1433
1434 if ctx.is_websocket_upgrade {
1436 if let Some(ref handler) = ctx.websocket_handler {
1437 let result = handler.process_client_data(body.take()).await;
1438 match result {
1439 crate::websocket::ProcessResult::Forward(data) => {
1440 *body = data;
1441 }
1442 crate::websocket::ProcessResult::Close(reason) => {
1443 warn!(
1444 correlation_id = %ctx.trace_id,
1445 code = reason.code,
1446 reason = %reason.reason,
1447 "WebSocket connection closed by agent (client->server)"
1448 );
1449 return Err(Error::explain(
1451 ErrorType::InternalError,
1452 format!("WebSocket closed: {} {}", reason.code, reason.reason),
1453 ));
1454 }
1455 }
1456 }
1457 return Ok(());
1459 }
1460
1461 let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
1463 if chunk_len > 0 {
1464 ctx.request_body_bytes += chunk_len as u64;
1465
1466 trace!(
1467 correlation_id = %ctx.trace_id,
1468 chunk_size = chunk_len,
1469 total_body_bytes = ctx.request_body_bytes,
1470 end_of_stream = end_of_stream,
1471 streaming_mode = ?ctx.request_body_streaming_mode,
1472 "Processing request body chunk"
1473 );
1474
1475 let config = ctx
1477 .config
1478 .get_or_insert_with(|| self.config_manager.current());
1479 if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
1480 warn!(
1481 correlation_id = %ctx.trace_id,
1482 body_bytes = ctx.request_body_bytes,
1483 limit = config.limits.max_body_size_bytes,
1484 "Request body size limit exceeded"
1485 );
1486 self.metrics.record_blocked_request("body_size_exceeded");
1487 return Err(Error::explain(
1488 ErrorType::InternalError,
1489 "Request body too large",
1490 ));
1491 }
1492 }
1493
1494 if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
1496 let config = ctx
1497 .config
1498 .get_or_insert_with(|| self.config_manager.current());
1499 let max_inspection_bytes = config
1500 .waf
1501 .as_ref()
1502 .map(|w| w.body_inspection.max_inspection_bytes as u64)
1503 .unwrap_or(1024 * 1024);
1504
1505 match ctx.request_body_streaming_mode {
1506 BodyStreamingMode::Stream => {
1507 if body.is_some() {
1509 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1510 .await?;
1511 } else if end_of_stream && ctx.agent_needs_more {
1512 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1514 .await?;
1515 }
1516 }
1517 BodyStreamingMode::Hybrid { buffer_threshold } => {
1518 if ctx.body_bytes_inspected < buffer_threshold as u64 {
1520 if let Some(ref chunk) = body {
1522 let bytes_to_buffer = std::cmp::min(
1523 chunk.len(),
1524 (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
1525 );
1526 ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
1527 ctx.body_bytes_inspected += bytes_to_buffer as u64;
1528
1529 if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
1531 {
1532 self.send_buffered_body_to_agents(
1534 end_of_stream && chunk.len() == bytes_to_buffer,
1535 ctx,
1536 )
1537 .await?;
1538 ctx.body_buffer.clear();
1539
1540 if bytes_to_buffer < chunk.len() {
1542 let remaining = chunk.slice(bytes_to_buffer..);
1543 let mut remaining_body = Some(remaining);
1544 self.process_body_chunk_streaming(
1545 &mut remaining_body,
1546 end_of_stream,
1547 ctx,
1548 )
1549 .await?;
1550 }
1551 }
1552 }
1553 } else {
1554 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1556 .await?;
1557 }
1558 }
1559 BodyStreamingMode::Buffer => {
1560 if let Some(ref chunk) = body {
1562 if ctx.body_bytes_inspected < max_inspection_bytes {
1563 let bytes_to_inspect = std::cmp::min(
1564 chunk.len() as u64,
1565 max_inspection_bytes - ctx.body_bytes_inspected,
1566 ) as usize;
1567
1568 ctx.body_buffer
1569 .extend_from_slice(&chunk[..bytes_to_inspect]);
1570 ctx.body_bytes_inspected += bytes_to_inspect as u64;
1571
1572 trace!(
1573 correlation_id = %ctx.trace_id,
1574 bytes_inspected = ctx.body_bytes_inspected,
1575 max_inspection_bytes = max_inspection_bytes,
1576 buffer_size = ctx.body_buffer.len(),
1577 "Buffering body for agent inspection"
1578 );
1579 }
1580 }
1581
1582 let should_send =
1584 end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1585 if should_send && !ctx.body_buffer.is_empty() {
1586 self.send_buffered_body_to_agents(end_of_stream, ctx)
1587 .await?;
1588 ctx.body_buffer.clear();
1589 }
1590 }
1591 }
1592 }
1593
1594 if end_of_stream {
1595 trace!(
1596 correlation_id = %ctx.trace_id,
1597 total_body_bytes = ctx.request_body_bytes,
1598 bytes_inspected = ctx.body_bytes_inspected,
1599 "Request body complete"
1600 );
1601 }
1602
1603 Ok(())
1604 }
1605
1606 async fn response_filter(
1607 &self,
1608 _session: &mut Session,
1609 upstream_response: &mut ResponseHeader,
1610 ctx: &mut Self::CTX,
1611 ) -> Result<(), Box<Error>> {
1612 let status = upstream_response.status.as_u16();
1613 let duration = ctx.elapsed();
1614
1615 trace!(
1616 correlation_id = %ctx.trace_id,
1617 status = status,
1618 "Starting response filter phase"
1619 );
1620
1621 if status == 101 && ctx.is_websocket_upgrade {
1623 if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1624 let inspector = crate::websocket::WebSocketInspector::with_metrics(
1626 self.agent_manager.clone(),
1627 ctx.route_id
1628 .clone()
1629 .unwrap_or_else(|| "unknown".to_string()),
1630 ctx.trace_id.clone(),
1631 ctx.client_ip.clone(),
1632 100, Some(self.metrics.clone()),
1634 );
1635
1636 let handler = crate::websocket::WebSocketHandler::new(
1637 std::sync::Arc::new(inspector),
1638 1024 * 1024, );
1640
1641 ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1642
1643 info!(
1644 correlation_id = %ctx.trace_id,
1645 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1646 agent_count = ctx.websocket_inspection_agents.len(),
1647 "WebSocket upgrade successful, frame inspection enabled"
1648 );
1649 } else if ctx.websocket_skip_inspection {
1650 debug!(
1651 correlation_id = %ctx.trace_id,
1652 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1653 "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1654 );
1655 } else {
1656 debug!(
1657 correlation_id = %ctx.trace_id,
1658 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1659 "WebSocket upgrade successful"
1660 );
1661 }
1662 }
1663
1664 trace!(
1666 correlation_id = %ctx.trace_id,
1667 "Applying security headers"
1668 );
1669 self.apply_security_headers(upstream_response).ok();
1670
1671 upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1673
1674 if let Some(ref rate_info) = ctx.rate_limit_info {
1676 upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1677 upstream_response
1678 .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1679 upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1680 }
1681
1682 if ctx.inference_budget_enabled {
1684 if let Some(remaining) = ctx.inference_budget_remaining {
1685 upstream_response.insert_header("X-Budget-Remaining", remaining.to_string())?;
1686 }
1687 if let Some(period_reset) = ctx.inference_budget_period_reset {
1688 let reset_datetime = chrono::DateTime::from_timestamp(period_reset as i64, 0)
1690 .map(|dt| dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1691 .unwrap_or_else(|| period_reset.to_string());
1692 upstream_response.insert_header("X-Budget-Period-Reset", reset_datetime)?;
1693 }
1694 }
1695
1696 if let Some(ref country_code) = ctx.geo_country_code {
1698 upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1699 }
1700
1701 if ctx.sticky_session_new_assignment {
1703 if let Some(ref set_cookie_header) = ctx.sticky_session_set_cookie {
1704 upstream_response.insert_header("Set-Cookie", set_cookie_header)?;
1705 trace!(
1706 correlation_id = %ctx.trace_id,
1707 sticky_target_index = ?ctx.sticky_target_index,
1708 "Added sticky session Set-Cookie header"
1709 );
1710 }
1711 }
1712
1713 if ctx.guardrail_warning {
1715 upstream_response.insert_header("X-Guardrail-Warning", "prompt-injection-detected")?;
1716 }
1717
1718 if ctx.used_fallback() {
1720 upstream_response.insert_header("X-Fallback-Used", "true")?;
1721
1722 if let Some(ref upstream) = ctx.upstream {
1723 upstream_response.insert_header("X-Fallback-Upstream", upstream)?;
1724 }
1725
1726 if let Some(ref reason) = ctx.fallback_reason {
1727 upstream_response.insert_header("X-Fallback-Reason", reason.to_string())?;
1728 }
1729
1730 if let Some(ref original) = ctx.original_upstream {
1731 upstream_response.insert_header("X-Original-Upstream", original)?;
1732 }
1733
1734 if let Some(ref mapping) = ctx.model_mapping_applied {
1735 upstream_response
1736 .insert_header("X-Model-Mapping", format!("{} -> {}", mapping.0, mapping.1))?;
1737 }
1738
1739 trace!(
1740 correlation_id = %ctx.trace_id,
1741 fallback_attempt = ctx.fallback_attempt,
1742 fallback_upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1743 original_upstream = ctx.original_upstream.as_deref().unwrap_or("unknown"),
1744 "Added fallback response headers"
1745 );
1746
1747 if status < 400 {
1749 if let Some(metrics) = get_fallback_metrics() {
1750 metrics.record_fallback_success(
1751 ctx.route_id.as_deref().unwrap_or("unknown"),
1752 ctx.upstream.as_deref().unwrap_or("unknown"),
1753 );
1754 }
1755 }
1756 }
1757
1758 if ctx.inference_rate_limit_enabled {
1760 let content_type = upstream_response
1762 .headers
1763 .get("content-type")
1764 .and_then(|ct| ct.to_str().ok());
1765
1766 if is_sse_response(content_type) {
1767 let provider = ctx
1769 .route_config
1770 .as_ref()
1771 .and_then(|r| r.inference.as_ref())
1772 .map(|i| i.provider)
1773 .unwrap_or_default();
1774
1775 ctx.inference_streaming_response = true;
1776 ctx.inference_streaming_counter = Some(StreamingTokenCounter::new(
1777 provider,
1778 ctx.inference_model.clone(),
1779 ));
1780
1781 trace!(
1782 correlation_id = %ctx.trace_id,
1783 content_type = ?content_type,
1784 model = ?ctx.inference_model,
1785 "Initialized streaming token counter for SSE response"
1786 );
1787 }
1788 }
1789
1790 if status >= 400 {
1792 trace!(
1793 correlation_id = %ctx.trace_id,
1794 status = status,
1795 "Handling error response"
1796 );
1797 self.handle_error_response(upstream_response, ctx).await?;
1798 }
1799
1800 self.metrics.record_request(
1802 ctx.route_id.as_deref().unwrap_or("unknown"),
1803 &ctx.method,
1804 status,
1805 duration,
1806 );
1807
1808 if let Some(ref mut span) = ctx.otel_span {
1810 span.set_status(status);
1811 if let Some(ref upstream) = ctx.upstream {
1812 span.set_upstream(upstream, "");
1813 }
1814 if status >= 500 {
1815 span.record_error(&format!("HTTP {}", status));
1816 }
1817 }
1818
1819 if let Some(ref upstream) = ctx.upstream {
1821 let success = status < 500;
1822
1823 trace!(
1824 correlation_id = %ctx.trace_id,
1825 upstream = %upstream,
1826 success = success,
1827 status = status,
1828 "Recording passive health check result"
1829 );
1830
1831 let error_msg = if !success {
1832 Some(format!("HTTP {}", status))
1833 } else {
1834 None
1835 };
1836 self.passive_health
1837 .record_outcome(upstream, success, error_msg.as_deref())
1838 .await;
1839
1840 if let Some(pool) = self.upstream_pools.get(upstream).await {
1842 pool.report_result(upstream, success).await;
1843 }
1844
1845 if !success {
1846 warn!(
1847 correlation_id = %ctx.trace_id,
1848 upstream = %upstream,
1849 status = status,
1850 "Upstream returned error status"
1851 );
1852 }
1853 }
1854
1855 if status >= 500 {
1857 error!(
1858 correlation_id = %ctx.trace_id,
1859 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1860 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1861 method = %ctx.method,
1862 path = %ctx.path,
1863 status = status,
1864 duration_ms = duration.as_millis(),
1865 attempts = ctx.upstream_attempts,
1866 "Request completed with server error"
1867 );
1868 } else if status >= 400 {
1869 warn!(
1870 correlation_id = %ctx.trace_id,
1871 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1872 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1873 method = %ctx.method,
1874 path = %ctx.path,
1875 status = status,
1876 duration_ms = duration.as_millis(),
1877 "Request completed with client error"
1878 );
1879 } else {
1880 debug!(
1881 correlation_id = %ctx.trace_id,
1882 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1883 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1884 method = %ctx.method,
1885 path = %ctx.path,
1886 status = status,
1887 duration_ms = duration.as_millis(),
1888 attempts = ctx.upstream_attempts,
1889 "Request completed"
1890 );
1891 }
1892
1893 Ok(())
1894 }
1895
1896 async fn upstream_request_filter(
1899 &self,
1900 _session: &mut Session,
1901 upstream_request: &mut pingora::http::RequestHeader,
1902 ctx: &mut Self::CTX,
1903 ) -> Result<()>
1904 where
1905 Self::CTX: Send + Sync,
1906 {
1907 trace!(
1908 correlation_id = %ctx.trace_id,
1909 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1910 "Applying upstream request modifications"
1911 );
1912
1913 upstream_request
1915 .insert_header("X-Trace-Id", &ctx.trace_id)
1916 .ok();
1917
1918 if let Some(ref span) = ctx.otel_span {
1920 let sampled = ctx
1921 .trace_context
1922 .as_ref()
1923 .map(|c| c.sampled)
1924 .unwrap_or(true);
1925 let traceparent =
1926 crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled);
1927 upstream_request
1928 .insert_header(crate::otel::TRACEPARENT_HEADER, &traceparent)
1929 .ok();
1930 }
1931
1932 upstream_request
1934 .insert_header("X-Forwarded-By", "Sentinel")
1935 .ok();
1936
1937 if let Some(ref route_config) = ctx.route_config {
1941 let mods = &route_config.policies.request_headers;
1942
1943 for (name, value) in &mods.set {
1945 upstream_request
1946 .insert_header(name.clone(), value.as_str())
1947 .ok();
1948 }
1949
1950 for (name, value) in &mods.add {
1952 upstream_request
1953 .append_header(name.clone(), value.as_str())
1954 .ok();
1955 }
1956
1957 for name in &mods.remove {
1959 upstream_request.remove_header(name);
1960 }
1961
1962 trace!(
1963 correlation_id = %ctx.trace_id,
1964 "Applied request header modifications"
1965 );
1966 }
1967
1968 upstream_request.remove_header("X-Internal-Token");
1970 upstream_request.remove_header("Authorization-Internal");
1971
1972 if let Some(ref route_config) = ctx.route_config {
1975 if let Some(ref shadow_config) = route_config.shadow {
1976 let pools_snapshot = self.upstream_pools.snapshot().await;
1978 let upstream_pools = std::sync::Arc::new(pools_snapshot);
1979
1980 let route_id = ctx
1982 .route_id
1983 .clone()
1984 .unwrap_or_else(|| "unknown".to_string());
1985
1986 let shadow_manager = crate::shadow::ShadowManager::new(
1988 upstream_pools,
1989 shadow_config.clone(),
1990 Some(std::sync::Arc::clone(&self.metrics)),
1991 route_id,
1992 );
1993
1994 if shadow_manager.should_shadow(upstream_request) {
1996 trace!(
1997 correlation_id = %ctx.trace_id,
1998 shadow_upstream = %shadow_config.upstream,
1999 percentage = shadow_config.percentage,
2000 "Shadowing request"
2001 );
2002
2003 let shadow_headers = upstream_request.clone();
2005
2006 let shadow_ctx = crate::upstream::RequestContext {
2008 client_ip: ctx.client_ip.parse().ok(),
2009 headers: std::collections::HashMap::new(), path: ctx.path.clone(),
2011 method: ctx.method.clone(),
2012 };
2013
2014 let buffer_body = shadow_config.buffer_body
2016 && crate::shadow::should_buffer_method(&ctx.method);
2017
2018 if buffer_body {
2019 trace!(
2023 correlation_id = %ctx.trace_id,
2024 "Deferring shadow request until body is buffered"
2025 );
2026 ctx.shadow_pending = Some(crate::proxy::context::ShadowPendingRequest {
2027 headers: shadow_headers,
2028 manager: std::sync::Arc::new(shadow_manager),
2029 request_ctx: shadow_ctx,
2030 include_body: true,
2031 });
2032 if !ctx.body_inspection_enabled {
2035 ctx.body_inspection_enabled = true;
2036 }
2039 } else {
2040 shadow_manager.shadow_request(shadow_headers, None, shadow_ctx);
2042 ctx.shadow_sent = true;
2043 }
2044 }
2045 }
2046 }
2047
2048 Ok(())
2049 }
2050
2051 fn response_body_filter(
2057 &self,
2058 _session: &mut Session,
2059 body: &mut Option<Bytes>,
2060 end_of_stream: bool,
2061 ctx: &mut Self::CTX,
2062 ) -> Result<Option<Duration>, Box<Error>> {
2063 if ctx.is_websocket_upgrade {
2066 if let Some(ref handler) = ctx.websocket_handler {
2067 let handler = handler.clone();
2068 let data = body.take();
2069
2070 let result = tokio::task::block_in_place(|| {
2073 tokio::runtime::Handle::current()
2074 .block_on(async { handler.process_server_data(data).await })
2075 });
2076
2077 match result {
2078 crate::websocket::ProcessResult::Forward(data) => {
2079 *body = data;
2080 }
2081 crate::websocket::ProcessResult::Close(reason) => {
2082 warn!(
2083 correlation_id = %ctx.trace_id,
2084 code = reason.code,
2085 reason = %reason.reason,
2086 "WebSocket connection closed by agent (server->client)"
2087 );
2088 let close_frame =
2091 crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
2092 let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
2093 if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
2094 *body = Some(Bytes::from(encoded));
2095 }
2096 }
2097 }
2098 }
2099 return Ok(None);
2101 }
2102
2103 if let Some(ref chunk) = body {
2105 ctx.response_bytes += chunk.len() as u64;
2106
2107 trace!(
2108 correlation_id = %ctx.trace_id,
2109 chunk_size = chunk.len(),
2110 total_response_bytes = ctx.response_bytes,
2111 end_of_stream = end_of_stream,
2112 "Processing response body chunk"
2113 );
2114
2115 if let Some(ref mut counter) = ctx.inference_streaming_counter {
2117 let result = counter.process_chunk(chunk);
2118
2119 if result.content.is_some() || result.is_done {
2120 trace!(
2121 correlation_id = %ctx.trace_id,
2122 has_content = result.content.is_some(),
2123 is_done = result.is_done,
2124 chunks_processed = counter.chunks_processed(),
2125 accumulated_content_len = counter.content().len(),
2126 "Processed SSE chunk for token counting"
2127 );
2128 }
2129 }
2130
2131 if ctx.response_body_inspection_enabled
2135 && !ctx.response_body_inspection_agents.is_empty()
2136 {
2137 let config = ctx
2138 .config
2139 .get_or_insert_with(|| self.config_manager.current());
2140 let max_inspection_bytes = config
2141 .waf
2142 .as_ref()
2143 .map(|w| w.body_inspection.max_inspection_bytes as u64)
2144 .unwrap_or(1024 * 1024);
2145
2146 if ctx.response_body_bytes_inspected < max_inspection_bytes {
2147 let bytes_to_inspect = std::cmp::min(
2148 chunk.len() as u64,
2149 max_inspection_bytes - ctx.response_body_bytes_inspected,
2150 ) as usize;
2151
2152 ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
2156 ctx.response_body_chunk_index += 1;
2157
2158 trace!(
2159 correlation_id = %ctx.trace_id,
2160 bytes_inspected = ctx.response_body_bytes_inspected,
2161 max_inspection_bytes = max_inspection_bytes,
2162 chunk_index = ctx.response_body_chunk_index,
2163 "Tracking response body for inspection"
2164 );
2165 }
2166 }
2167 }
2168
2169 if end_of_stream {
2170 trace!(
2171 correlation_id = %ctx.trace_id,
2172 total_response_bytes = ctx.response_bytes,
2173 response_bytes_inspected = ctx.response_body_bytes_inspected,
2174 "Response body complete"
2175 );
2176 }
2177
2178 Ok(None)
2180 }
2181
2182 async fn connected_to_upstream(
2185 &self,
2186 _session: &mut Session,
2187 reused: bool,
2188 peer: &HttpPeer,
2189 #[cfg(unix)] _fd: RawFd,
2190 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
2191 digest: Option<&Digest>,
2192 ctx: &mut Self::CTX,
2193 ) -> Result<(), Box<Error>> {
2194 ctx.connection_reused = reused;
2196
2197 if reused {
2199 trace!(
2200 correlation_id = %ctx.trace_id,
2201 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2202 peer_address = %peer.address(),
2203 "Reusing existing upstream connection"
2204 );
2205 } else {
2206 debug!(
2207 correlation_id = %ctx.trace_id,
2208 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2209 peer_address = %peer.address(),
2210 ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
2211 "Established new upstream connection"
2212 );
2213 }
2214
2215 Ok(())
2216 }
2217
2218 fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
2228 let route_id = match ctx.route_id.as_deref() {
2230 Some(id) => id,
2231 None => {
2232 trace!(
2233 correlation_id = %ctx.trace_id,
2234 "Cache filter: no route ID, skipping cache"
2235 );
2236 return Ok(());
2237 }
2238 };
2239
2240 if !self.cache_manager.is_enabled(route_id) {
2242 trace!(
2243 correlation_id = %ctx.trace_id,
2244 route_id = %route_id,
2245 "Cache disabled for route"
2246 );
2247 return Ok(());
2248 }
2249
2250 if !self
2252 .cache_manager
2253 .is_method_cacheable(route_id, &ctx.method)
2254 {
2255 trace!(
2256 correlation_id = %ctx.trace_id,
2257 route_id = %route_id,
2258 method = %ctx.method,
2259 "Method not cacheable"
2260 );
2261 return Ok(());
2262 }
2263
2264 debug!(
2266 correlation_id = %ctx.trace_id,
2267 route_id = %route_id,
2268 method = %ctx.method,
2269 path = %ctx.path,
2270 "Enabling HTTP caching for request"
2271 );
2272
2273 let storage = get_cache_storage();
2275 let eviction = get_cache_eviction();
2276 let cache_lock = get_cache_lock();
2277
2278 session.cache.enable(
2280 storage,
2281 Some(eviction),
2282 None, Some(cache_lock),
2284 None, );
2286
2287 ctx.cache_eligible = true;
2289
2290 trace!(
2291 correlation_id = %ctx.trace_id,
2292 route_id = %route_id,
2293 cache_enabled = session.cache.enabled(),
2294 "Cache enabled for request"
2295 );
2296
2297 Ok(())
2298 }
2299
2300 fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
2305 let req_header = session.req_header();
2306 let method = req_header.method.as_str();
2307 let path = req_header.uri.path();
2308 let host = ctx.host.as_deref().unwrap_or("unknown");
2309 let query = req_header.uri.query();
2310
2311 let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2313
2314 trace!(
2315 correlation_id = %ctx.trace_id,
2316 cache_key = %key_string,
2317 "Generated cache key"
2318 );
2319
2320 Ok(CacheKey::default(req_header))
2323 }
2324
2325 fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
2330 session.cache.cache_miss();
2332
2333 if let Some(route_id) = ctx.route_id.as_deref() {
2335 self.cache_manager.stats().record_miss();
2336
2337 trace!(
2338 correlation_id = %ctx.trace_id,
2339 route_id = %route_id,
2340 path = %ctx.path,
2341 "Cache miss"
2342 );
2343 }
2344 }
2345
2346 async fn cache_hit_filter(
2352 &self,
2353 session: &mut Session,
2354 meta: &CacheMeta,
2355 _hit_handler: &mut HitHandler,
2356 is_fresh: bool,
2357 ctx: &mut Self::CTX,
2358 ) -> Result<Option<ForcedFreshness>>
2359 where
2360 Self::CTX: Send + Sync,
2361 {
2362 let req_header = session.req_header();
2364 let method = req_header.method.as_str();
2365 let path = req_header.uri.path();
2366 let host = req_header.uri.host().unwrap_or("localhost");
2367 let query = req_header.uri.query();
2368
2369 let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2371
2372 if self.cache_manager.should_invalidate(&cache_key) {
2374 info!(
2375 correlation_id = %ctx.trace_id,
2376 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2377 cache_key = %cache_key,
2378 "Cache entry invalidated by purge request"
2379 );
2380 return Ok(Some(ForcedFreshness::ForceExpired));
2382 }
2383
2384 if is_fresh {
2386 self.cache_manager.stats().record_hit();
2387
2388 debug!(
2389 correlation_id = %ctx.trace_id,
2390 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2391 is_fresh = is_fresh,
2392 "Cache hit (fresh)"
2393 );
2394 } else {
2395 trace!(
2396 correlation_id = %ctx.trace_id,
2397 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2398 is_fresh = is_fresh,
2399 "Cache hit (stale)"
2400 );
2401 }
2402
2403 Ok(None)
2405 }
2406
2407 fn response_cache_filter(
2412 &self,
2413 _session: &Session,
2414 resp: &ResponseHeader,
2415 ctx: &mut Self::CTX,
2416 ) -> Result<RespCacheable> {
2417 let route_id = match ctx.route_id.as_deref() {
2418 Some(id) => id,
2419 None => {
2420 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2421 "no_route",
2422 )));
2423 }
2424 };
2425
2426 if !self.cache_manager.is_enabled(route_id) {
2428 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2429 "disabled",
2430 )));
2431 }
2432
2433 let status = resp.status.as_u16();
2434
2435 if !self.cache_manager.is_status_cacheable(route_id, status) {
2437 trace!(
2438 correlation_id = %ctx.trace_id,
2439 route_id = %route_id,
2440 status = status,
2441 "Status code not cacheable"
2442 );
2443 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2444 }
2445
2446 if let Some(cache_control) = resp.headers.get("cache-control") {
2448 if let Ok(cc_str) = cache_control.to_str() {
2449 if crate::cache::CacheManager::is_no_cache(cc_str) {
2450 trace!(
2451 correlation_id = %ctx.trace_id,
2452 route_id = %route_id,
2453 cache_control = %cc_str,
2454 "Response has no-cache directive"
2455 );
2456 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2457 }
2458 }
2459 }
2460
2461 let cache_control = resp
2463 .headers
2464 .get("cache-control")
2465 .and_then(|v| v.to_str().ok());
2466 let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
2467
2468 if ttl.is_zero() {
2469 trace!(
2470 correlation_id = %ctx.trace_id,
2471 route_id = %route_id,
2472 "TTL is zero, not caching"
2473 );
2474 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2475 }
2476
2477 let config = self
2479 .cache_manager
2480 .get_route_config(route_id)
2481 .unwrap_or_default();
2482
2483 let now = std::time::SystemTime::now();
2485 let fresh_until = now + ttl;
2486
2487 let header = resp.clone();
2489
2490 let cache_meta = CacheMeta::new(
2492 fresh_until,
2493 now,
2494 config.stale_while_revalidate_secs as u32,
2495 config.stale_if_error_secs as u32,
2496 header,
2497 );
2498
2499 self.cache_manager.stats().record_store();
2501
2502 debug!(
2503 correlation_id = %ctx.trace_id,
2504 route_id = %route_id,
2505 status = status,
2506 ttl_secs = ttl.as_secs(),
2507 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2508 stale_if_error_secs = config.stale_if_error_secs,
2509 "Caching response"
2510 );
2511
2512 Ok(RespCacheable::Cacheable(cache_meta))
2513 }
2514
2515 fn should_serve_stale(
2519 &self,
2520 _session: &mut Session,
2521 ctx: &mut Self::CTX,
2522 error: Option<&Error>,
2523 ) -> bool {
2524 let route_id = match ctx.route_id.as_deref() {
2525 Some(id) => id,
2526 None => return false,
2527 };
2528
2529 let config = match self.cache_manager.get_route_config(route_id) {
2531 Some(c) => c,
2532 None => return false,
2533 };
2534
2535 if let Some(e) = error {
2537 if e.esource() == &pingora::ErrorSource::Upstream {
2539 debug!(
2540 correlation_id = %ctx.trace_id,
2541 route_id = %route_id,
2542 error = %e,
2543 stale_if_error_secs = config.stale_if_error_secs,
2544 "Considering stale-if-error"
2545 );
2546 return config.stale_if_error_secs > 0;
2547 }
2548 }
2549
2550 if error.is_none() && config.stale_while_revalidate_secs > 0 {
2552 trace!(
2553 correlation_id = %ctx.trace_id,
2554 route_id = %route_id,
2555 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2556 "Allowing stale-while-revalidate"
2557 );
2558 return true;
2559 }
2560
2561 false
2562 }
2563
2564 fn range_header_filter(
2574 &self,
2575 session: &mut Session,
2576 response: &mut ResponseHeader,
2577 ctx: &mut Self::CTX,
2578 ) -> pingora_proxy::RangeType
2579 where
2580 Self::CTX: Send + Sync,
2581 {
2582 let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
2584 matches!(
2586 config.service_type,
2587 sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
2588 )
2589 });
2590
2591 if !supports_range {
2592 trace!(
2593 correlation_id = %ctx.trace_id,
2594 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2595 "Range request not supported for this route type"
2596 );
2597 return pingora_proxy::RangeType::None;
2598 }
2599
2600 let range_type = pingora_proxy::range_header_filter(session.req_header(), response, None);
2602
2603 match &range_type {
2604 pingora_proxy::RangeType::None => {
2605 trace!(
2606 correlation_id = %ctx.trace_id,
2607 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2608 "No range request or not applicable"
2609 );
2610 }
2611 pingora_proxy::RangeType::Single(range) => {
2612 trace!(
2613 correlation_id = %ctx.trace_id,
2614 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2615 range_start = range.start,
2616 range_end = range.end,
2617 "Processing single-range request"
2618 );
2619 }
2620 pingora_proxy::RangeType::Multi(multi) => {
2621 trace!(
2622 correlation_id = %ctx.trace_id,
2623 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2624 range_count = multi.ranges.len(),
2625 "Processing multi-range request"
2626 );
2627 }
2628 pingora_proxy::RangeType::Invalid => {
2629 debug!(
2630 correlation_id = %ctx.trace_id,
2631 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2632 "Invalid range header"
2633 );
2634 }
2635 }
2636
2637 range_type
2638 }
2639
2640 async fn fail_to_proxy(
2643 &self,
2644 session: &mut Session,
2645 e: &Error,
2646 ctx: &mut Self::CTX,
2647 ) -> pingora_proxy::FailToProxy
2648 where
2649 Self::CTX: Send + Sync,
2650 {
2651 let error_code = match e.etype() {
2652 ErrorType::ConnectRefused => 503,
2654 ErrorType::ConnectTimedout => 504,
2655 ErrorType::ConnectNoRoute => 502,
2656
2657 ErrorType::ReadTimedout => 504,
2659 ErrorType::WriteTimedout => 504,
2660
2661 ErrorType::TLSHandshakeFailure => 502,
2663 ErrorType::InvalidCert => 502,
2664
2665 ErrorType::InvalidHTTPHeader => 400,
2667 ErrorType::H2Error => 502,
2668
2669 ErrorType::ConnectProxyFailure => 502,
2671 ErrorType::ConnectionClosed => 502,
2672
2673 ErrorType::HTTPStatus(status) => *status,
2675
2676 ErrorType::InternalError => {
2678 let error_str = e.to_string();
2680 if error_str.contains("upstream")
2681 || error_str.contains("DNS")
2682 || error_str.contains("resolve")
2683 {
2684 502
2685 } else {
2686 500
2687 }
2688 }
2689
2690 _ => 502,
2692 };
2693
2694 error!(
2695 correlation_id = %ctx.trace_id,
2696 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2697 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2698 error_type = ?e.etype(),
2699 error = %e,
2700 error_code = error_code,
2701 "Proxy error occurred"
2702 );
2703
2704 self.metrics
2706 .record_blocked_request(&format!("proxy_error_{}", error_code));
2707
2708 let error_message = match error_code {
2712 400 => "Bad Request",
2713 502 => "Bad Gateway",
2714 503 => "Service Unavailable",
2715 504 => "Gateway Timeout",
2716 _ => "Internal Server Error",
2717 };
2718
2719 let body = format!(
2721 r#"{{"error":"{} {}","trace_id":"{}"}}"#,
2722 error_code, error_message, ctx.trace_id
2723 );
2724
2725 let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
2727 header
2728 .insert_header("Content-Type", "application/json")
2729 .ok();
2730 header
2731 .insert_header("Content-Length", body.len().to_string())
2732 .ok();
2733 header
2734 .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
2735 .ok();
2736 header.insert_header("Connection", "close").ok();
2737
2738 if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
2740 warn!(
2741 correlation_id = %ctx.trace_id,
2742 error = %write_err,
2743 "Failed to write error response header"
2744 );
2745 } else {
2746 if let Err(write_err) = session
2748 .write_response_body(Some(bytes::Bytes::from(body)), true)
2749 .await
2750 {
2751 warn!(
2752 correlation_id = %ctx.trace_id,
2753 error = %write_err,
2754 "Failed to write error response body"
2755 );
2756 }
2757 }
2758
2759 pingora_proxy::FailToProxy {
2762 error_code,
2763 can_reuse_downstream: false,
2764 }
2765 }
2766
2767 fn error_while_proxy(
2773 &self,
2774 peer: &HttpPeer,
2775 session: &mut Session,
2776 e: Box<Error>,
2777 ctx: &mut Self::CTX,
2778 client_reused: bool,
2779 ) -> Box<Error> {
2780 let error_type = e.etype().clone();
2781 let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
2782
2783 let is_retryable = matches!(
2785 error_type,
2786 ErrorType::ConnectTimedout
2787 | ErrorType::ReadTimedout
2788 | ErrorType::WriteTimedout
2789 | ErrorType::ConnectionClosed
2790 | ErrorType::ConnectRefused
2791 );
2792
2793 warn!(
2795 correlation_id = %ctx.trace_id,
2796 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2797 upstream = %upstream_id,
2798 peer_address = %peer.address(),
2799 error_type = ?error_type,
2800 error = %e,
2801 client_reused = client_reused,
2802 is_retryable = is_retryable,
2803 "Error during proxy operation"
2804 );
2805
2806 let peer_address = peer.address().to_string();
2809 let upstream_pools = self.upstream_pools.clone();
2810 let upstream_id_owned = upstream_id.to_string();
2811 tokio::spawn(async move {
2812 if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
2813 pool.report_result(&peer_address, false).await;
2814 }
2815 });
2816
2817 self.metrics
2819 .record_blocked_request(&format!("proxy_error_{:?}", error_type));
2820
2821 let mut enhanced_error = e.more_context(format!(
2823 "Upstream: {}, Peer: {}, Attempts: {}",
2824 upstream_id,
2825 peer.address(),
2826 ctx.upstream_attempts
2827 ));
2828
2829 if is_retryable {
2834 let can_retry = if client_reused {
2835 !session.as_ref().retry_buffer_truncated()
2837 } else {
2838 true
2840 };
2841
2842 enhanced_error.retry.decide_reuse(can_retry);
2843
2844 if can_retry {
2845 debug!(
2846 correlation_id = %ctx.trace_id,
2847 upstream = %upstream_id,
2848 error_type = ?error_type,
2849 "Error is retryable, will attempt retry"
2850 );
2851 }
2852 } else {
2853 enhanced_error.retry.decide_reuse(false);
2855 }
2856
2857 enhanced_error
2858 }
2859
2860 async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
2861 self.reload_coordinator.dec_requests();
2863
2864 if !ctx.shadow_sent {
2866 if let Some(shadow_pending) = ctx.shadow_pending.take() {
2867 let body = if shadow_pending.include_body && !ctx.body_buffer.is_empty() {
2868 Some(ctx.body_buffer.clone())
2870 } else {
2871 None
2872 };
2873
2874 trace!(
2875 correlation_id = %ctx.trace_id,
2876 body_size = body.as_ref().map(|b| b.len()).unwrap_or(0),
2877 "Firing deferred shadow request with buffered body"
2878 );
2879
2880 shadow_pending.manager.shadow_request(
2881 shadow_pending.headers,
2882 body,
2883 shadow_pending.request_ctx,
2884 );
2885 ctx.shadow_sent = true;
2886 }
2887 }
2888
2889 let duration = ctx.elapsed();
2890
2891 let status = session
2893 .response_written()
2894 .map(|r| r.status.as_u16())
2895 .unwrap_or(0);
2896
2897 if let (Some(ref peer_addr), Some(ref upstream_id)) =
2900 (&ctx.selected_upstream_address, &ctx.upstream)
2901 {
2902 let success = status > 0 && status < 500;
2904
2905 if let Some(pool) = self.upstream_pools.get(upstream_id).await {
2906 pool.report_result_with_latency(peer_addr, success, Some(duration))
2907 .await;
2908 trace!(
2909 correlation_id = %ctx.trace_id,
2910 upstream = %upstream_id,
2911 peer_address = %peer_addr,
2912 success = success,
2913 duration_ms = duration.as_millis(),
2914 status = status,
2915 "Reported result to adaptive load balancer"
2916 );
2917 }
2918
2919 if ctx.inference_rate_limit_enabled && success {
2921 let cold_detected = self.warmth_tracker.record_request(peer_addr, duration);
2922 if cold_detected {
2923 debug!(
2924 correlation_id = %ctx.trace_id,
2925 upstream = %upstream_id,
2926 peer_address = %peer_addr,
2927 duration_ms = duration.as_millis(),
2928 "Cold model detected on inference upstream"
2929 );
2930 }
2931 }
2932 }
2933
2934 if ctx.inference_rate_limit_enabled {
2937 if let (Some(route_id), Some(ref rate_limit_key)) =
2938 (ctx.route_id.as_deref(), &ctx.inference_rate_limit_key)
2939 {
2940 let response_headers = session
2942 .response_written()
2943 .map(|r| &r.headers)
2944 .cloned()
2945 .unwrap_or_default();
2946
2947 let streaming_result = if ctx.inference_streaming_response {
2949 ctx.inference_streaming_counter
2950 .as_ref()
2951 .map(|counter| counter.finalize())
2952 } else {
2953 None
2954 };
2955
2956 if let Some(ref result) = streaming_result {
2958 debug!(
2959 correlation_id = %ctx.trace_id,
2960 output_tokens = result.output_tokens,
2961 input_tokens = ?result.input_tokens,
2962 source = ?result.source,
2963 content_length = result.content_length,
2964 "Finalized streaming token count"
2965 );
2966 }
2967
2968 if ctx.inference_streaming_response {
2970 if let Some(ref route_config) = ctx.route_config {
2971 if let Some(ref inference) = route_config.inference {
2972 if let Some(ref guardrails) = inference.guardrails {
2973 if let Some(ref pii_config) = guardrails.pii_detection {
2974 if pii_config.enabled {
2975 if let Some(ref counter) = ctx.inference_streaming_counter {
2977 let response_content = counter.content();
2978 if !response_content.is_empty() {
2979 let pii_result = self
2980 .guardrail_processor
2981 .check_pii(
2982 pii_config,
2983 response_content,
2984 ctx.route_id.as_deref(),
2985 &ctx.trace_id,
2986 )
2987 .await;
2988
2989 match pii_result {
2990 crate::inference::PiiCheckResult::Detected {
2991 detections,
2992 redacted_content: _,
2993 } => {
2994 warn!(
2995 correlation_id = %ctx.trace_id,
2996 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2997 detection_count = detections.len(),
2998 "PII detected in inference response"
2999 );
3000
3001 ctx.pii_detection_categories = detections
3003 .iter()
3004 .map(|d| d.category.clone())
3005 .collect();
3006
3007 for detection in &detections {
3009 self.metrics.record_pii_detected(
3010 ctx.route_id.as_deref().unwrap_or("unknown"),
3011 &detection.category,
3012 );
3013 }
3014 }
3015 crate::inference::PiiCheckResult::Clean => {
3016 trace!(
3017 correlation_id = %ctx.trace_id,
3018 "No PII detected in response"
3019 );
3020 }
3021 crate::inference::PiiCheckResult::Error { message } => {
3022 debug!(
3023 correlation_id = %ctx.trace_id,
3024 error = %message,
3025 "PII detection check failed"
3026 );
3027 }
3028 }
3029 }
3030 }
3031 }
3032 }
3033 }
3034 }
3035 }
3036 }
3037
3038 let empty_body: &[u8] = &[];
3042
3043 if let Some(actual_estimate) = self.inference_rate_limit_manager.record_actual(
3044 route_id,
3045 rate_limit_key,
3046 &response_headers,
3047 empty_body,
3048 ctx.inference_estimated_tokens,
3049 ) {
3050 let (actual_tokens, source_info) = if let Some(ref streaming) = streaming_result
3052 {
3053 if streaming.total_tokens.is_some() {
3055 (streaming.total_tokens.unwrap(), "streaming_api")
3056 } else if actual_estimate.source == crate::inference::TokenSource::Estimated
3057 {
3058 let total = ctx.inference_input_tokens + streaming.output_tokens;
3061 (total, "streaming_tiktoken")
3062 } else {
3063 (actual_estimate.tokens, "headers")
3064 }
3065 } else {
3066 (actual_estimate.tokens, "headers")
3067 };
3068
3069 ctx.inference_actual_tokens = Some(actual_tokens);
3070
3071 debug!(
3072 correlation_id = %ctx.trace_id,
3073 route_id = route_id,
3074 estimated_tokens = ctx.inference_estimated_tokens,
3075 actual_tokens = actual_tokens,
3076 source = source_info,
3077 streaming_response = ctx.inference_streaming_response,
3078 model = ?ctx.inference_model,
3079 "Recorded actual inference tokens"
3080 );
3081
3082 if ctx.inference_budget_enabled {
3084 let alerts = self.inference_rate_limit_manager.record_budget(
3085 route_id,
3086 rate_limit_key,
3087 actual_tokens,
3088 );
3089
3090 for alert in alerts.iter() {
3092 warn!(
3093 correlation_id = %ctx.trace_id,
3094 route_id = route_id,
3095 tenant = %alert.tenant,
3096 threshold_pct = alert.threshold * 100.0,
3097 tokens_used = alert.tokens_used,
3098 tokens_limit = alert.tokens_limit,
3099 "Token budget alert threshold crossed"
3100 );
3101 }
3102
3103 if let Some(status) = self
3105 .inference_rate_limit_manager
3106 .budget_status(route_id, rate_limit_key)
3107 {
3108 ctx.inference_budget_remaining = Some(status.tokens_remaining as i64);
3109 }
3110 }
3111
3112 if ctx.inference_cost_enabled {
3114 if let Some(model) = ctx.inference_model.as_deref() {
3115 let (input_tokens, output_tokens) = if let Some(ref streaming) =
3117 streaming_result
3118 {
3119 let input =
3121 streaming.input_tokens.unwrap_or(ctx.inference_input_tokens);
3122 let output = streaming.output_tokens;
3123 (input, output)
3124 } else {
3125 let input = ctx.inference_input_tokens;
3127 let output = actual_tokens.saturating_sub(input);
3128 (input, output)
3129 };
3130 ctx.inference_output_tokens = output_tokens;
3131
3132 if let Some(cost_result) = self
3133 .inference_rate_limit_manager
3134 .calculate_cost(route_id, model, input_tokens, output_tokens)
3135 {
3136 ctx.inference_request_cost = Some(cost_result.total_cost);
3137
3138 trace!(
3139 correlation_id = %ctx.trace_id,
3140 route_id = route_id,
3141 model = model,
3142 input_tokens = input_tokens,
3143 output_tokens = output_tokens,
3144 total_cost = cost_result.total_cost,
3145 currency = %cost_result.currency,
3146 "Calculated inference request cost"
3147 );
3148 }
3149 }
3150 }
3151 }
3152 }
3153 }
3154
3155 if self.log_manager.should_log_access(status) {
3157 let access_entry = AccessLogEntry {
3158 timestamp: chrono::Utc::now().to_rfc3339(),
3159 trace_id: ctx.trace_id.clone(),
3160 method: ctx.method.clone(),
3161 path: ctx.path.clone(),
3162 query: ctx.query.clone(),
3163 protocol: "HTTP/1.1".to_string(),
3164 status,
3165 body_bytes: ctx.response_bytes,
3166 duration_ms: duration.as_millis() as u64,
3167 client_ip: ctx.client_ip.clone(),
3168 user_agent: ctx.user_agent.clone(),
3169 referer: ctx.referer.clone(),
3170 host: ctx.host.clone(),
3171 route_id: ctx.route_id.clone(),
3172 upstream: ctx.upstream.clone(),
3173 upstream_attempts: ctx.upstream_attempts,
3174 instance_id: self.app_state.instance_id.clone(),
3175 namespace: ctx.namespace.clone(),
3176 service: ctx.service.clone(),
3177 body_bytes_sent: ctx.response_bytes,
3179 upstream_addr: ctx.selected_upstream_address.clone(),
3180 connection_reused: ctx.connection_reused,
3181 rate_limit_hit: status == 429,
3182 geo_country: ctx.geo_country_code.clone(),
3183 };
3184 self.log_manager.log_access(&access_entry);
3185 }
3186
3187 if tracing::enabled!(tracing::Level::DEBUG) {
3189 debug!(
3190 trace_id = %ctx.trace_id,
3191 method = %ctx.method,
3192 path = %ctx.path,
3193 route_id = ?ctx.route_id,
3194 upstream = ?ctx.upstream,
3195 status = status,
3196 duration_ms = duration.as_millis() as u64,
3197 upstream_attempts = ctx.upstream_attempts,
3198 error = ?_error.map(|e| e.to_string()),
3199 "Request completed"
3200 );
3201 }
3202
3203 if ctx.is_websocket_upgrade && status == 101 {
3205 info!(
3206 trace_id = %ctx.trace_id,
3207 route_id = ?ctx.route_id,
3208 upstream = ?ctx.upstream,
3209 client_ip = %ctx.client_ip,
3210 "WebSocket connection established"
3211 );
3212 }
3213
3214 if let Some(span) = ctx.otel_span.take() {
3216 span.end();
3217 }
3218 }
3219}
3220
3221impl SentinelProxy {
3226 async fn process_body_chunk_streaming(
3228 &self,
3229 body: &mut Option<Bytes>,
3230 end_of_stream: bool,
3231 ctx: &mut RequestContext,
3232 ) -> Result<(), Box<Error>> {
3233 let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
3235 let chunk_index = ctx.request_body_chunk_index;
3236 ctx.request_body_chunk_index += 1;
3237 ctx.body_bytes_inspected += chunk_data.len() as u64;
3238
3239 debug!(
3240 correlation_id = %ctx.trace_id,
3241 chunk_index = chunk_index,
3242 chunk_size = chunk_data.len(),
3243 end_of_stream = end_of_stream,
3244 "Streaming body chunk to agents"
3245 );
3246
3247 let agent_ctx = crate::agents::AgentCallContext {
3249 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3250 metadata: sentinel_agent_protocol::RequestMetadata {
3251 correlation_id: ctx.trace_id.clone(),
3252 request_id: ctx.trace_id.clone(),
3253 client_ip: ctx.client_ip.clone(),
3254 client_port: 0,
3255 server_name: ctx.host.clone(),
3256 protocol: "HTTP/1.1".to_string(),
3257 tls_version: None,
3258 tls_cipher: None,
3259 route_id: ctx.route_id.clone(),
3260 upstream_id: ctx.upstream.clone(),
3261 timestamp: chrono::Utc::now().to_rfc3339(),
3262 traceparent: ctx.traceparent(),
3263 },
3264 route_id: ctx.route_id.clone(),
3265 upstream_id: ctx.upstream.clone(),
3266 request_body: None, response_body: None,
3268 };
3269
3270 let agent_ids = ctx.body_inspection_agents.clone();
3271 let total_size = None; match self
3274 .agent_manager
3275 .process_request_body_streaming(
3276 &agent_ctx,
3277 &chunk_data,
3278 end_of_stream,
3279 chunk_index,
3280 ctx.body_bytes_inspected as usize,
3281 total_size,
3282 &agent_ids,
3283 )
3284 .await
3285 {
3286 Ok(decision) => {
3287 ctx.agent_needs_more = decision.needs_more;
3289
3290 if let Some(ref mutation) = decision.request_body_mutation {
3292 if !mutation.is_pass_through() {
3293 if mutation.is_drop() {
3294 *body = None;
3296 trace!(
3297 correlation_id = %ctx.trace_id,
3298 chunk_index = chunk_index,
3299 "Agent dropped body chunk"
3300 );
3301 } else if let Some(ref new_data) = mutation.data {
3302 *body = Some(Bytes::from(new_data.clone()));
3304 trace!(
3305 correlation_id = %ctx.trace_id,
3306 chunk_index = chunk_index,
3307 original_size = chunk_data.len(),
3308 new_size = new_data.len(),
3309 "Agent mutated body chunk"
3310 );
3311 }
3312 }
3313 }
3314
3315 if !decision.needs_more && !decision.is_allow() {
3317 warn!(
3318 correlation_id = %ctx.trace_id,
3319 action = ?decision.action,
3320 "Agent blocked request body"
3321 );
3322 self.metrics.record_blocked_request("agent_body_inspection");
3323
3324 let (status, message) = match &decision.action {
3325 crate::agents::AgentAction::Block { status, body, .. } => (
3326 *status,
3327 body.clone().unwrap_or_else(|| "Blocked".to_string()),
3328 ),
3329 _ => (403, "Forbidden".to_string()),
3330 };
3331
3332 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3333 }
3334
3335 trace!(
3336 correlation_id = %ctx.trace_id,
3337 needs_more = decision.needs_more,
3338 "Agent processed body chunk"
3339 );
3340 }
3341 Err(e) => {
3342 let fail_closed = ctx
3343 .route_config
3344 .as_ref()
3345 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3346 .unwrap_or(false);
3347
3348 if fail_closed {
3349 error!(
3350 correlation_id = %ctx.trace_id,
3351 error = %e,
3352 "Agent streaming body inspection failed, blocking (fail-closed)"
3353 );
3354 return Err(Error::explain(
3355 ErrorType::HTTPStatus(503),
3356 "Service unavailable",
3357 ));
3358 } else {
3359 warn!(
3360 correlation_id = %ctx.trace_id,
3361 error = %e,
3362 "Agent streaming body inspection failed, allowing (fail-open)"
3363 );
3364 }
3365 }
3366 }
3367
3368 Ok(())
3369 }
3370
3371 async fn send_buffered_body_to_agents(
3373 &self,
3374 end_of_stream: bool,
3375 ctx: &mut RequestContext,
3376 ) -> Result<(), Box<Error>> {
3377 debug!(
3378 correlation_id = %ctx.trace_id,
3379 buffer_size = ctx.body_buffer.len(),
3380 end_of_stream = end_of_stream,
3381 agent_count = ctx.body_inspection_agents.len(),
3382 decompression_enabled = ctx.decompression_enabled,
3383 "Sending buffered body to agents for inspection"
3384 );
3385
3386 let body_for_inspection = if ctx.decompression_enabled {
3388 if let Some(ref encoding) = ctx.body_content_encoding {
3389 let config = crate::decompression::DecompressionConfig {
3390 max_ratio: ctx.max_decompression_ratio,
3391 max_output_bytes: ctx.max_decompression_bytes,
3392 };
3393
3394 match crate::decompression::decompress_body(&ctx.body_buffer, encoding, &config) {
3395 Ok(result) => {
3396 ctx.body_was_decompressed = true;
3397 self.metrics
3398 .record_decompression_success(encoding, result.ratio);
3399 debug!(
3400 correlation_id = %ctx.trace_id,
3401 encoding = %encoding,
3402 compressed_size = result.compressed_size,
3403 decompressed_size = result.decompressed_size,
3404 ratio = result.ratio,
3405 "Body decompressed for agent inspection"
3406 );
3407 result.data
3408 }
3409 Err(e) => {
3410 let failure_reason = match &e {
3412 crate::decompression::DecompressionError::RatioExceeded { .. } => {
3413 "ratio_exceeded"
3414 }
3415 crate::decompression::DecompressionError::SizeExceeded { .. } => {
3416 "size_exceeded"
3417 }
3418 crate::decompression::DecompressionError::InvalidData { .. } => {
3419 "invalid_data"
3420 }
3421 crate::decompression::DecompressionError::UnsupportedEncoding {
3422 ..
3423 } => "unsupported",
3424 crate::decompression::DecompressionError::IoError(_) => "io_error",
3425 };
3426 self.metrics
3427 .record_decompression_failure(encoding, failure_reason);
3428
3429 let fail_closed = ctx
3431 .route_config
3432 .as_ref()
3433 .map(|r| {
3434 r.policies.failure_mode == sentinel_config::FailureMode::Closed
3435 })
3436 .unwrap_or(false);
3437
3438 if fail_closed {
3439 error!(
3440 correlation_id = %ctx.trace_id,
3441 error = %e,
3442 encoding = %encoding,
3443 "Decompression failed, blocking (fail-closed)"
3444 );
3445 return Err(Error::explain(
3446 ErrorType::HTTPStatus(400),
3447 "Invalid compressed body",
3448 ));
3449 } else {
3450 warn!(
3451 correlation_id = %ctx.trace_id,
3452 error = %e,
3453 encoding = %encoding,
3454 "Decompression failed, sending compressed body (fail-open)"
3455 );
3456 ctx.body_buffer.clone()
3457 }
3458 }
3459 }
3460 } else {
3461 ctx.body_buffer.clone()
3462 }
3463 } else {
3464 ctx.body_buffer.clone()
3465 };
3466
3467 let agent_ctx = crate::agents::AgentCallContext {
3468 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3469 metadata: sentinel_agent_protocol::RequestMetadata {
3470 correlation_id: ctx.trace_id.clone(),
3471 request_id: ctx.trace_id.clone(),
3472 client_ip: ctx.client_ip.clone(),
3473 client_port: 0,
3474 server_name: ctx.host.clone(),
3475 protocol: "HTTP/1.1".to_string(),
3476 tls_version: None,
3477 tls_cipher: None,
3478 route_id: ctx.route_id.clone(),
3479 upstream_id: ctx.upstream.clone(),
3480 timestamp: chrono::Utc::now().to_rfc3339(),
3481 traceparent: ctx.traceparent(),
3482 },
3483 route_id: ctx.route_id.clone(),
3484 upstream_id: ctx.upstream.clone(),
3485 request_body: Some(body_for_inspection.clone()),
3486 response_body: None,
3487 };
3488
3489 let agent_ids = ctx.body_inspection_agents.clone();
3490 match self
3491 .agent_manager
3492 .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
3493 .await
3494 {
3495 Ok(decision) => {
3496 if !decision.is_allow() {
3497 warn!(
3498 correlation_id = %ctx.trace_id,
3499 action = ?decision.action,
3500 "Agent blocked request body"
3501 );
3502 self.metrics.record_blocked_request("agent_body_inspection");
3503
3504 let (status, message) = match &decision.action {
3505 crate::agents::AgentAction::Block { status, body, .. } => (
3506 *status,
3507 body.clone().unwrap_or_else(|| "Blocked".to_string()),
3508 ),
3509 _ => (403, "Forbidden".to_string()),
3510 };
3511
3512 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3513 }
3514
3515 trace!(
3516 correlation_id = %ctx.trace_id,
3517 "Agent allowed request body"
3518 );
3519 }
3520 Err(e) => {
3521 let fail_closed = ctx
3522 .route_config
3523 .as_ref()
3524 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3525 .unwrap_or(false);
3526
3527 if fail_closed {
3528 error!(
3529 correlation_id = %ctx.trace_id,
3530 error = %e,
3531 "Agent body inspection failed, blocking (fail-closed)"
3532 );
3533 return Err(Error::explain(
3534 ErrorType::HTTPStatus(503),
3535 "Service unavailable",
3536 ));
3537 } else {
3538 warn!(
3539 correlation_id = %ctx.trace_id,
3540 error = %e,
3541 "Agent body inspection failed, allowing (fail-open)"
3542 );
3543 }
3544 }
3545 }
3546
3547 Ok(())
3548 }
3549}