1use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14 CacheKey, CacheMeta, ForcedInvalidationKind, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
23use crate::rate_limit::HeaderAccessor;
24use crate::routing::RequestInfo;
25
26use super::context::RequestContext;
27use super::SentinelProxy;
28
29struct NoHeaderAccessor;
31impl HeaderAccessor for NoHeaderAccessor {
32 fn get_header(&self, _name: &str) -> Option<String> {
33 None
34 }
35}
36
37#[async_trait]
38impl ProxyHttp for SentinelProxy {
39 type CTX = RequestContext;
40
41 fn new_ctx(&self) -> Self::CTX {
42 RequestContext::new()
43 }
44
45 fn fail_to_connect(
46 &self,
47 _session: &mut Session,
48 peer: &HttpPeer,
49 ctx: &mut Self::CTX,
50 e: Box<Error>,
51 ) -> Box<Error> {
52 error!(
53 correlation_id = %ctx.trace_id,
54 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
55 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
56 peer_address = %peer.address(),
57 error = %e,
58 "Failed to connect to upstream peer"
59 );
60 e
62 }
63
64 async fn early_request_filter(
67 &self,
68 session: &mut Session,
69 ctx: &mut Self::CTX,
70 ) -> Result<(), Box<Error>> {
71 let req_header = session.req_header();
73 let method = req_header.method.as_str();
74 let path = req_header.uri.path();
75 let host = req_header
76 .headers
77 .get("host")
78 .and_then(|h| h.to_str().ok())
79 .unwrap_or("");
80
81 ctx.method = method.to_string();
82 ctx.path = path.to_string();
83 ctx.host = Some(host.to_string());
84
85 let route_match = {
87 let route_matcher = self.route_matcher.read();
88 let request_info = RequestInfo::new(method, path, host);
89 match route_matcher.match_request(&request_info) {
90 Some(m) => m,
91 None => return Ok(()), }
93 };
94
95 ctx.trace_id = self.get_trace_id(session);
96 ctx.route_id = Some(route_match.route_id.to_string());
97 ctx.route_config = Some(route_match.config.clone());
98
99 if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
101 if let Ok(s) = traceparent.to_str() {
102 ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
103 }
104 }
105
106 if let Some(tracer) = crate::otel::get_tracer() {
108 ctx.otel_span = Some(tracer.start_span(method, path, ctx.trace_context.as_ref()));
109 }
110
111 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
113 trace!(
114 correlation_id = %ctx.trace_id,
115 route_id = %route_match.route_id,
116 builtin_handler = ?route_match.config.builtin_handler,
117 "Handling builtin route in early_request_filter"
118 );
119
120 let handled = self
122 .handle_builtin_route(session, ctx, &route_match)
123 .await?;
124
125 if handled {
126 return Err(Error::explain(
128 ErrorType::InternalError,
129 "Builtin handler complete",
130 ));
131 }
132 }
133
134 Ok(())
135 }
136
137 async fn upstream_peer(
138 &self,
139 session: &mut Session,
140 ctx: &mut Self::CTX,
141 ) -> Result<Box<HttpPeer>, Box<Error>> {
142 self.reload_coordinator.inc_requests();
144
145 if ctx.config.is_none() {
147 ctx.config = Some(self.config_manager.current());
148 }
149
150 if ctx.client_ip.is_empty() {
152 ctx.client_ip = session
153 .client_addr()
154 .map(|a| a.to_string())
155 .unwrap_or_else(|| "unknown".to_string());
156 }
157
158 let req_header = session.req_header();
159
160 if ctx.method.is_empty() {
162 ctx.method = req_header.method.to_string();
163 ctx.path = req_header.uri.path().to_string();
164 ctx.query = req_header.uri.query().map(|q| q.to_string());
165 ctx.host = req_header
166 .headers
167 .get("host")
168 .and_then(|v| v.to_str().ok())
169 .map(|s| s.to_string());
170 }
171 ctx.user_agent = req_header
172 .headers
173 .get("user-agent")
174 .and_then(|v| v.to_str().ok())
175 .map(|s| s.to_string());
176 ctx.referer = req_header
177 .headers
178 .get("referer")
179 .and_then(|v| v.to_str().ok())
180 .map(|s| s.to_string());
181
182 trace!(
183 correlation_id = %ctx.trace_id,
184 client_ip = %ctx.client_ip,
185 "Request received, initializing context"
186 );
187
188 let route_match = if let Some(ref route_config) = ctx.route_config {
190 let route_id = ctx.route_id.as_deref().unwrap_or("");
191 crate::routing::RouteMatch {
192 route_id: sentinel_common::RouteId::new(route_id),
193 config: route_config.clone(),
194 }
195 } else {
196 let (match_result, route_duration) = {
198 let route_matcher = self.route_matcher.read();
199 let host = ctx.host.as_deref().unwrap_or("");
200
201 let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
203
204 if route_matcher.needs_headers() {
206 request_info = request_info
207 .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
208 }
209
210 if route_matcher.needs_query_params() {
212 request_info =
213 request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
214 }
215
216 trace!(
217 correlation_id = %ctx.trace_id,
218 method = %request_info.method,
219 path = %request_info.path,
220 host = %request_info.host,
221 "Built request info for route matching"
222 );
223
224 let route_start = std::time::Instant::now();
225 let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
226 warn!(
227 correlation_id = %ctx.trace_id,
228 method = %request_info.method,
229 path = %request_info.path,
230 host = %request_info.host,
231 "No matching route found for request"
232 );
233 Error::explain(ErrorType::InternalError, "No matching route found")
234 })?;
235 let route_duration = route_start.elapsed();
236 (route_match, route_duration)
238 };
239
240 ctx.route_id = Some(match_result.route_id.to_string());
241 ctx.route_config = Some(match_result.config.clone());
242
243 if ctx.trace_id.is_empty() {
245 ctx.trace_id = self.get_trace_id(session);
246
247 if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER)
249 {
250 if let Ok(s) = traceparent.to_str() {
251 ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
252 }
253 }
254
255 if let Some(tracer) = crate::otel::get_tracer() {
257 ctx.otel_span = Some(tracer.start_span(
258 &ctx.method,
259 &ctx.path,
260 ctx.trace_context.as_ref(),
261 ));
262 }
263 }
264
265 trace!(
266 correlation_id = %ctx.trace_id,
267 route_id = %match_result.route_id,
268 route_duration_us = route_duration.as_micros(),
269 service_type = ?match_result.config.service_type,
270 "Route matched"
271 );
272 match_result
273 };
274
275 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
277 trace!(
278 correlation_id = %ctx.trace_id,
279 route_id = %route_match.route_id,
280 builtin_handler = ?route_match.config.builtin_handler,
281 "Route type is builtin, skipping upstream"
282 );
283 ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
285 return Err(Error::explain(
287 ErrorType::InternalError,
288 "Builtin handler handled in request_filter",
289 ));
290 }
291
292 if route_match.config.service_type == sentinel_config::ServiceType::Static {
294 trace!(
295 correlation_id = %ctx.trace_id,
296 route_id = %route_match.route_id,
297 "Route type is static, checking for static server"
298 );
299 if self
301 .static_servers
302 .get(route_match.route_id.as_str())
303 .await
304 .is_some()
305 {
306 ctx.upstream = Some(format!("_static_{}", route_match.route_id));
308 info!(
309 correlation_id = %ctx.trace_id,
310 route_id = %route_match.route_id,
311 path = %ctx.path,
312 "Serving static file"
313 );
314 return Err(Error::explain(
316 ErrorType::InternalError,
317 "Static file serving handled in request_filter",
318 ));
319 }
320 }
321
322 if let Some(ref upstream) = route_match.config.upstream {
324 ctx.upstream = Some(upstream.clone());
325 trace!(
326 correlation_id = %ctx.trace_id,
327 route_id = %route_match.route_id,
328 upstream = %upstream,
329 "Upstream configured for route"
330 );
331 } else {
332 error!(
333 correlation_id = %ctx.trace_id,
334 route_id = %route_match.route_id,
335 "Route has no upstream configured"
336 );
337 return Err(Error::explain(
338 ErrorType::InternalError,
339 format!(
340 "Route '{}' has no upstream configured",
341 route_match.route_id
342 ),
343 ));
344 }
345
346 debug!(
347 correlation_id = %ctx.trace_id,
348 route_id = %route_match.route_id,
349 upstream = ?ctx.upstream,
350 method = %req_header.method,
351 path = %req_header.uri.path(),
352 host = ctx.host.as_deref().unwrap_or("-"),
353 client_ip = %ctx.client_ip,
354 "Processing request"
355 );
356
357 if ctx
359 .upstream
360 .as_ref()
361 .is_some_and(|u| u.starts_with("_static_"))
362 {
363 return Err(Error::explain(
365 ErrorType::InternalError,
366 "Static route should be handled in request_filter",
367 ));
368 }
369
370 let upstream_name = ctx
371 .upstream
372 .as_ref()
373 .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
374
375 trace!(
376 correlation_id = %ctx.trace_id,
377 upstream = %upstream_name,
378 "Looking up upstream pool"
379 );
380
381 let pool = self
382 .upstream_pools
383 .get(upstream_name)
384 .await
385 .ok_or_else(|| {
386 error!(
387 correlation_id = %ctx.trace_id,
388 upstream = %upstream_name,
389 "Upstream pool not found"
390 );
391 Error::explain(
392 ErrorType::InternalError,
393 format!("Upstream pool '{}' not found", upstream_name),
394 )
395 })?;
396
397 let max_retries = route_match
399 .config
400 .retry_policy
401 .as_ref()
402 .map(|r| r.max_attempts)
403 .unwrap_or(1);
404
405 trace!(
406 correlation_id = %ctx.trace_id,
407 upstream = %upstream_name,
408 max_retries = max_retries,
409 "Starting upstream peer selection"
410 );
411
412 let mut last_error = None;
413 let selection_start = std::time::Instant::now();
414
415 for attempt in 1..=max_retries {
416 ctx.upstream_attempts = attempt;
417
418 trace!(
419 correlation_id = %ctx.trace_id,
420 upstream = %upstream_name,
421 attempt = attempt,
422 max_retries = max_retries,
423 "Attempting to select upstream peer"
424 );
425
426 match pool.select_peer(None).await {
427 Ok(peer) => {
428 let selection_duration = selection_start.elapsed();
429 let peer_addr = peer.address().to_string();
431 ctx.selected_upstream_address = Some(peer_addr.clone());
432 debug!(
433 correlation_id = %ctx.trace_id,
434 upstream = %upstream_name,
435 peer_address = %peer_addr,
436 attempt = attempt,
437 selection_duration_us = selection_duration.as_micros(),
438 "Selected upstream peer"
439 );
440 return Ok(Box::new(peer));
441 }
442 Err(e) => {
443 warn!(
444 correlation_id = %ctx.trace_id,
445 upstream = %upstream_name,
446 attempt = attempt,
447 max_retries = max_retries,
448 error = %e,
449 "Failed to select upstream peer"
450 );
451 last_error = Some(e);
452
453 if attempt < max_retries {
454 let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
456 trace!(
457 correlation_id = %ctx.trace_id,
458 backoff_ms = backoff.as_millis(),
459 "Backing off before retry"
460 );
461 sleep(backoff).await;
462 }
463 }
464 }
465 }
466
467 let selection_duration = selection_start.elapsed();
468 error!(
469 correlation_id = %ctx.trace_id,
470 upstream = %upstream_name,
471 attempts = max_retries,
472 selection_duration_ms = selection_duration.as_millis(),
473 last_error = ?last_error,
474 "All upstream selection attempts failed"
475 );
476
477 Err(Error::explain(
478 ErrorType::InternalError,
479 format!("All upstream attempts failed: {:?}", last_error),
480 ))
481 }
482
483 async fn request_filter(
484 &self,
485 session: &mut Session,
486 ctx: &mut Self::CTX,
487 ) -> Result<bool, Box<Error>> {
488 trace!(
489 correlation_id = %ctx.trace_id,
490 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
491 "Starting request filter phase"
492 );
493
494 if let Some(route_id) = ctx.route_id.as_deref() {
497 if self.rate_limit_manager.has_route_limiter(route_id) {
498 let rate_result = self.rate_limit_manager.check(
499 route_id,
500 &ctx.client_ip,
501 &ctx.path,
502 Option::<&NoHeaderAccessor>::None,
503 );
504
505 if rate_result.limit > 0 {
507 ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
508 limit: rate_result.limit,
509 remaining: rate_result.remaining,
510 reset_at: rate_result.reset_at,
511 });
512 }
513
514 if !rate_result.allowed {
515 use sentinel_config::RateLimitAction;
516
517 match rate_result.action {
518 RateLimitAction::Reject => {
519 warn!(
520 correlation_id = %ctx.trace_id,
521 route_id = route_id,
522 client_ip = %ctx.client_ip,
523 limiter = %rate_result.limiter,
524 limit = rate_result.limit,
525 remaining = rate_result.remaining,
526 "Request rate limited"
527 );
528 self.metrics.record_blocked_request("rate_limited");
529
530 let audit_entry = AuditLogEntry::rate_limited(
532 &ctx.trace_id,
533 &ctx.method,
534 &ctx.path,
535 &ctx.client_ip,
536 &rate_result.limiter,
537 )
538 .with_route_id(route_id)
539 .with_status_code(rate_result.status_code);
540 self.log_manager.log_audit(&audit_entry);
541
542 let body = rate_result
544 .message
545 .unwrap_or_else(|| "Rate limit exceeded".to_string());
546
547 let retry_after = rate_result.reset_at.saturating_sub(
549 std::time::SystemTime::now()
550 .duration_since(std::time::UNIX_EPOCH)
551 .unwrap_or_default()
552 .as_secs(),
553 );
554 crate::http_helpers::write_rate_limit_error(
555 session,
556 rate_result.status_code,
557 &body,
558 rate_result.limit,
559 rate_result.remaining,
560 rate_result.reset_at,
561 retry_after,
562 )
563 .await?;
564 return Ok(true); }
566 RateLimitAction::LogOnly => {
567 debug!(
568 correlation_id = %ctx.trace_id,
569 route_id = route_id,
570 "Rate limit exceeded (log only mode)"
571 );
572 }
574 RateLimitAction::Delay => {
575 if let Some(delay_ms) = rate_result.suggested_delay_ms {
577 let actual_delay = delay_ms.min(rate_result.max_delay_ms);
579
580 if actual_delay > 0 {
581 debug!(
582 correlation_id = %ctx.trace_id,
583 route_id = route_id,
584 suggested_delay_ms = delay_ms,
585 max_delay_ms = rate_result.max_delay_ms,
586 actual_delay_ms = actual_delay,
587 "Applying rate limit delay"
588 );
589
590 tokio::time::sleep(std::time::Duration::from_millis(
591 actual_delay,
592 ))
593 .await;
594 }
595 }
596 }
598 }
599 }
600 }
601 }
602
603 if let Some(route_id) = ctx.route_id.as_deref() {
605 if let Some(ref route_config) = ctx.route_config {
606 for filter_id in &route_config.filters {
607 if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
608 ctx.geo_country_code = result.country_code.clone();
610 ctx.geo_lookup_performed = true;
611
612 if !result.allowed {
613 warn!(
614 correlation_id = %ctx.trace_id,
615 route_id = route_id,
616 client_ip = %ctx.client_ip,
617 country = ?result.country_code,
618 filter_id = %filter_id,
619 "Request blocked by geo filter"
620 );
621 self.metrics.record_blocked_request("geo_blocked");
622
623 let audit_entry = AuditLogEntry::new(
625 &ctx.trace_id,
626 AuditEventType::Blocked,
627 &ctx.method,
628 &ctx.path,
629 &ctx.client_ip,
630 )
631 .with_route_id(route_id)
632 .with_status_code(result.status_code)
633 .with_reason(format!(
634 "Geo blocked: country={}, filter={}",
635 result.country_code.as_deref().unwrap_or("unknown"),
636 filter_id
637 ));
638 self.log_manager.log_audit(&audit_entry);
639
640 let body = result
642 .block_message
643 .unwrap_or_else(|| "Access denied".to_string());
644
645 crate::http_helpers::write_error(
646 session,
647 result.status_code,
648 &body,
649 "text/plain",
650 )
651 .await?;
652 return Ok(true); }
654
655 break;
657 }
658 }
659 }
660 }
661
662 let is_websocket_upgrade = session
664 .req_header()
665 .headers
666 .get(http::header::UPGRADE)
667 .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
668 .unwrap_or(false);
669
670 if is_websocket_upgrade {
671 ctx.is_websocket_upgrade = true;
672
673 if let Some(ref route_config) = ctx.route_config {
675 if !route_config.websocket {
676 warn!(
677 correlation_id = %ctx.trace_id,
678 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
679 client_ip = %ctx.client_ip,
680 "WebSocket upgrade rejected: not enabled for route"
681 );
682
683 self.metrics.record_blocked_request("websocket_not_enabled");
684
685 let audit_entry = AuditLogEntry::new(
687 &ctx.trace_id,
688 AuditEventType::Blocked,
689 &ctx.method,
690 &ctx.path,
691 &ctx.client_ip,
692 )
693 .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
694 .with_action("websocket_rejected")
695 .with_reason("WebSocket not enabled for route");
696 self.log_manager.log_audit(&audit_entry);
697
698 crate::http_helpers::write_error(
700 session,
701 403,
702 "WebSocket not enabled for this route",
703 "text/plain",
704 )
705 .await?;
706 return Ok(true); }
708
709 debug!(
710 correlation_id = %ctx.trace_id,
711 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
712 "WebSocket upgrade request allowed"
713 );
714
715 if route_config.websocket_inspection {
717 let has_compression = session
719 .req_header()
720 .headers
721 .get("Sec-WebSocket-Extensions")
722 .and_then(|v| v.to_str().ok())
723 .map(|s| s.contains("permessage-deflate"))
724 .unwrap_or(false);
725
726 if has_compression {
727 debug!(
728 correlation_id = %ctx.trace_id,
729 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
730 "WebSocket inspection skipped: permessage-deflate negotiated"
731 );
732 ctx.websocket_skip_inspection = true;
733 } else {
734 ctx.websocket_inspection_enabled = true;
735
736 ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
738 sentinel_agent_protocol::EventType::WebSocketFrame,
739 );
740
741 debug!(
742 correlation_id = %ctx.trace_id,
743 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
744 agent_count = ctx.websocket_inspection_agents.len(),
745 "WebSocket frame inspection enabled"
746 );
747 }
748 }
749 }
750 }
751
752 if let Some(route_config) = ctx.route_config.clone() {
755 if route_config.service_type == sentinel_config::ServiceType::Static {
756 trace!(
757 correlation_id = %ctx.trace_id,
758 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
759 "Handling static file route"
760 );
761 let route_match = crate::routing::RouteMatch {
763 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
764 config: route_config.clone(),
765 };
766 return self.handle_static_route(session, ctx, &route_match).await;
767 } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
768 trace!(
769 correlation_id = %ctx.trace_id,
770 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
771 builtin_handler = ?route_config.builtin_handler,
772 "Handling builtin route"
773 );
774 let route_match = crate::routing::RouteMatch {
776 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
777 config: route_config.clone(),
778 };
779 return self.handle_builtin_route(session, ctx, &route_match).await;
780 }
781 }
782
783 if let Some(route_id) = ctx.route_id.clone() {
785 if let Some(validator) = self.validators.get(&route_id).await {
786 trace!(
787 correlation_id = %ctx.trace_id,
788 route_id = %route_id,
789 "Running API schema validation"
790 );
791 if let Some(result) = self
792 .validate_api_request(session, ctx, &route_id, &validator)
793 .await?
794 {
795 debug!(
796 correlation_id = %ctx.trace_id,
797 route_id = %route_id,
798 validation_passed = result,
799 "API validation complete"
800 );
801 return Ok(result);
802 }
803 }
804 }
805
806 let client_addr = session
808 .client_addr()
809 .map(|a| format!("{}", a))
810 .unwrap_or_else(|| "unknown".to_string());
811 let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
812
813 let req_header = session.req_header_mut();
814
815 req_header
817 .insert_header("X-Correlation-Id", &ctx.trace_id)
818 .ok();
819 req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
820
821 let config = ctx
823 .config
824 .get_or_insert_with(|| self.config_manager.current());
825
826 const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; let header_count = req_header.headers.len();
831 if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
832 && header_count > config.limits.max_header_count
833 {
834 warn!(
835 correlation_id = %ctx.trace_id,
836 header_count = header_count,
837 limit = config.limits.max_header_count,
838 "Request blocked: exceeds header count limit"
839 );
840
841 self.metrics.record_blocked_request("header_count_exceeded");
842 return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
843 }
844
845 if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
847 let total_header_size: usize = req_header
848 .headers
849 .iter()
850 .map(|(k, v)| k.as_str().len() + v.len())
851 .sum();
852
853 if total_header_size > config.limits.max_header_size_bytes {
854 warn!(
855 correlation_id = %ctx.trace_id,
856 header_size = total_header_size,
857 limit = config.limits.max_header_size_bytes,
858 "Request blocked: exceeds header size limit"
859 );
860
861 self.metrics.record_blocked_request("header_size_exceeded");
862 return Err(Error::explain(
863 ErrorType::InternalError,
864 "Headers too large",
865 ));
866 }
867 }
868
869 trace!(
871 correlation_id = %ctx.trace_id,
872 "Processing request through agents"
873 );
874 if let Err(e) = self
875 .process_agents(session, ctx, &client_addr, client_port)
876 .await
877 {
878 if let ErrorType::HTTPStatus(status) = e.etype() {
881 let error_msg = e.to_string();
883 let body = error_msg
884 .split("context:")
885 .nth(1)
886 .map(|s| s.trim())
887 .unwrap_or("Request blocked");
888 debug!(
889 correlation_id = %ctx.trace_id,
890 status = status,
891 body = %body,
892 "Sending HTTP error response for agent block"
893 );
894 crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
895 return Ok(true); }
897 return Err(e);
899 }
900
901 trace!(
902 correlation_id = %ctx.trace_id,
903 "Request filter phase complete, forwarding to upstream"
904 );
905
906 Ok(false) }
908
909 async fn request_body_filter(
916 &self,
917 _session: &mut Session,
918 body: &mut Option<Bytes>,
919 end_of_stream: bool,
920 ctx: &mut Self::CTX,
921 ) -> Result<(), Box<Error>> {
922 use sentinel_config::BodyStreamingMode;
923
924 if ctx.is_websocket_upgrade {
926 if let Some(ref handler) = ctx.websocket_handler {
927 let result = handler.process_client_data(body.take()).await;
928 match result {
929 crate::websocket::ProcessResult::Forward(data) => {
930 *body = data;
931 }
932 crate::websocket::ProcessResult::Close(reason) => {
933 warn!(
934 correlation_id = %ctx.trace_id,
935 code = reason.code,
936 reason = %reason.reason,
937 "WebSocket connection closed by agent (client->server)"
938 );
939 return Err(Error::explain(
941 ErrorType::InternalError,
942 format!("WebSocket closed: {} {}", reason.code, reason.reason),
943 ));
944 }
945 }
946 }
947 return Ok(());
949 }
950
951 let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
953 if chunk_len > 0 {
954 ctx.request_body_bytes += chunk_len as u64;
955
956 trace!(
957 correlation_id = %ctx.trace_id,
958 chunk_size = chunk_len,
959 total_body_bytes = ctx.request_body_bytes,
960 end_of_stream = end_of_stream,
961 streaming_mode = ?ctx.request_body_streaming_mode,
962 "Processing request body chunk"
963 );
964
965 let config = ctx
967 .config
968 .get_or_insert_with(|| self.config_manager.current());
969 if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
970 warn!(
971 correlation_id = %ctx.trace_id,
972 body_bytes = ctx.request_body_bytes,
973 limit = config.limits.max_body_size_bytes,
974 "Request body size limit exceeded"
975 );
976 self.metrics.record_blocked_request("body_size_exceeded");
977 return Err(Error::explain(
978 ErrorType::InternalError,
979 "Request body too large",
980 ));
981 }
982 }
983
984 if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
986 let config = ctx
987 .config
988 .get_or_insert_with(|| self.config_manager.current());
989 let max_inspection_bytes = config
990 .waf
991 .as_ref()
992 .map(|w| w.body_inspection.max_inspection_bytes as u64)
993 .unwrap_or(1024 * 1024);
994
995 match ctx.request_body_streaming_mode {
996 BodyStreamingMode::Stream => {
997 if body.is_some() {
999 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1000 .await?;
1001 } else if end_of_stream && ctx.agent_needs_more {
1002 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1004 .await?;
1005 }
1006 }
1007 BodyStreamingMode::Hybrid { buffer_threshold } => {
1008 if ctx.body_bytes_inspected < buffer_threshold as u64 {
1010 if let Some(ref chunk) = body {
1012 let bytes_to_buffer = std::cmp::min(
1013 chunk.len(),
1014 (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
1015 );
1016 ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
1017 ctx.body_bytes_inspected += bytes_to_buffer as u64;
1018
1019 if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
1021 {
1022 self.send_buffered_body_to_agents(
1024 end_of_stream && chunk.len() == bytes_to_buffer,
1025 ctx,
1026 )
1027 .await?;
1028 ctx.body_buffer.clear();
1029
1030 if bytes_to_buffer < chunk.len() {
1032 let remaining = chunk.slice(bytes_to_buffer..);
1033 let mut remaining_body = Some(remaining);
1034 self.process_body_chunk_streaming(
1035 &mut remaining_body,
1036 end_of_stream,
1037 ctx,
1038 )
1039 .await?;
1040 }
1041 }
1042 }
1043 } else {
1044 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1046 .await?;
1047 }
1048 }
1049 BodyStreamingMode::Buffer => {
1050 if let Some(ref chunk) = body {
1052 if ctx.body_bytes_inspected < max_inspection_bytes {
1053 let bytes_to_inspect = std::cmp::min(
1054 chunk.len() as u64,
1055 max_inspection_bytes - ctx.body_bytes_inspected,
1056 ) as usize;
1057
1058 ctx.body_buffer
1059 .extend_from_slice(&chunk[..bytes_to_inspect]);
1060 ctx.body_bytes_inspected += bytes_to_inspect as u64;
1061
1062 trace!(
1063 correlation_id = %ctx.trace_id,
1064 bytes_inspected = ctx.body_bytes_inspected,
1065 max_inspection_bytes = max_inspection_bytes,
1066 buffer_size = ctx.body_buffer.len(),
1067 "Buffering body for agent inspection"
1068 );
1069 }
1070 }
1071
1072 let should_send =
1074 end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1075 if should_send && !ctx.body_buffer.is_empty() {
1076 self.send_buffered_body_to_agents(end_of_stream, ctx)
1077 .await?;
1078 ctx.body_buffer.clear();
1079 }
1080 }
1081 }
1082 }
1083
1084 if end_of_stream {
1085 trace!(
1086 correlation_id = %ctx.trace_id,
1087 total_body_bytes = ctx.request_body_bytes,
1088 bytes_inspected = ctx.body_bytes_inspected,
1089 "Request body complete"
1090 );
1091 }
1092
1093 Ok(())
1094 }
1095
1096 async fn response_filter(
1097 &self,
1098 _session: &mut Session,
1099 upstream_response: &mut ResponseHeader,
1100 ctx: &mut Self::CTX,
1101 ) -> Result<(), Box<Error>> {
1102 let status = upstream_response.status.as_u16();
1103 let duration = ctx.elapsed();
1104
1105 trace!(
1106 correlation_id = %ctx.trace_id,
1107 status = status,
1108 "Starting response filter phase"
1109 );
1110
1111 if status == 101 && ctx.is_websocket_upgrade {
1113 if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1114 let inspector = crate::websocket::WebSocketInspector::with_metrics(
1116 self.agent_manager.clone(),
1117 ctx.route_id
1118 .clone()
1119 .unwrap_or_else(|| "unknown".to_string()),
1120 ctx.trace_id.clone(),
1121 ctx.client_ip.clone(),
1122 100, Some(self.metrics.clone()),
1124 );
1125
1126 let handler = crate::websocket::WebSocketHandler::new(
1127 std::sync::Arc::new(inspector),
1128 1024 * 1024, );
1130
1131 ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1132
1133 info!(
1134 correlation_id = %ctx.trace_id,
1135 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1136 agent_count = ctx.websocket_inspection_agents.len(),
1137 "WebSocket upgrade successful, frame inspection enabled"
1138 );
1139 } else if ctx.websocket_skip_inspection {
1140 debug!(
1141 correlation_id = %ctx.trace_id,
1142 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1143 "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1144 );
1145 } else {
1146 debug!(
1147 correlation_id = %ctx.trace_id,
1148 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1149 "WebSocket upgrade successful"
1150 );
1151 }
1152 }
1153
1154 trace!(
1156 correlation_id = %ctx.trace_id,
1157 "Applying security headers"
1158 );
1159 self.apply_security_headers(upstream_response).ok();
1160
1161 upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1163
1164 if let Some(ref rate_info) = ctx.rate_limit_info {
1166 upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1167 upstream_response
1168 .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1169 upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1170 }
1171
1172 if let Some(ref country_code) = ctx.geo_country_code {
1174 upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1175 }
1176
1177 if status >= 400 {
1179 trace!(
1180 correlation_id = %ctx.trace_id,
1181 status = status,
1182 "Handling error response"
1183 );
1184 self.handle_error_response(upstream_response, ctx).await?;
1185 }
1186
1187 self.metrics.record_request(
1189 ctx.route_id.as_deref().unwrap_or("unknown"),
1190 &ctx.method,
1191 status,
1192 duration,
1193 );
1194
1195 if let Some(ref mut span) = ctx.otel_span {
1197 span.set_status(status);
1198 if let Some(ref upstream) = ctx.upstream {
1199 span.set_upstream(upstream, "");
1200 }
1201 if status >= 500 {
1202 span.record_error(&format!("HTTP {}", status));
1203 }
1204 }
1205
1206 if let Some(ref upstream) = ctx.upstream {
1208 let success = status < 500;
1209
1210 trace!(
1211 correlation_id = %ctx.trace_id,
1212 upstream = %upstream,
1213 success = success,
1214 status = status,
1215 "Recording passive health check result"
1216 );
1217
1218 let error_msg = if !success {
1219 Some(format!("HTTP {}", status))
1220 } else {
1221 None
1222 };
1223 self.passive_health
1224 .record_outcome(upstream, success, error_msg.as_deref())
1225 .await;
1226
1227 if let Some(pool) = self.upstream_pools.get(upstream).await {
1229 pool.report_result(upstream, success).await;
1230 }
1231
1232 if !success {
1233 warn!(
1234 correlation_id = %ctx.trace_id,
1235 upstream = %upstream,
1236 status = status,
1237 "Upstream returned error status"
1238 );
1239 }
1240 }
1241
1242 if status >= 500 {
1244 error!(
1245 correlation_id = %ctx.trace_id,
1246 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1247 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1248 method = %ctx.method,
1249 path = %ctx.path,
1250 status = status,
1251 duration_ms = duration.as_millis(),
1252 attempts = ctx.upstream_attempts,
1253 "Request completed with server error"
1254 );
1255 } else if status >= 400 {
1256 warn!(
1257 correlation_id = %ctx.trace_id,
1258 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1259 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1260 method = %ctx.method,
1261 path = %ctx.path,
1262 status = status,
1263 duration_ms = duration.as_millis(),
1264 "Request completed with client error"
1265 );
1266 } else {
1267 debug!(
1268 correlation_id = %ctx.trace_id,
1269 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1270 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1271 method = %ctx.method,
1272 path = %ctx.path,
1273 status = status,
1274 duration_ms = duration.as_millis(),
1275 attempts = ctx.upstream_attempts,
1276 "Request completed"
1277 );
1278 }
1279
1280 Ok(())
1281 }
1282
1283 async fn upstream_request_filter(
1286 &self,
1287 _session: &mut Session,
1288 upstream_request: &mut pingora::http::RequestHeader,
1289 ctx: &mut Self::CTX,
1290 ) -> Result<()>
1291 where
1292 Self::CTX: Send + Sync,
1293 {
1294 trace!(
1295 correlation_id = %ctx.trace_id,
1296 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1297 "Applying upstream request modifications"
1298 );
1299
1300 upstream_request
1302 .insert_header("X-Trace-Id", &ctx.trace_id)
1303 .ok();
1304
1305 if let Some(ref span) = ctx.otel_span {
1307 let sampled = ctx.trace_context.as_ref().map(|c| c.sampled).unwrap_or(true);
1308 let traceparent =
1309 crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled);
1310 upstream_request
1311 .insert_header(crate::otel::TRACEPARENT_HEADER, &traceparent)
1312 .ok();
1313 }
1314
1315 upstream_request
1317 .insert_header("X-Forwarded-By", "Sentinel")
1318 .ok();
1319
1320 if let Some(ref route_config) = ctx.route_config {
1323 let mods = route_config.policies.request_headers.clone();
1324
1325 for (name, value) in mods.set {
1327 upstream_request.insert_header(name, value).ok();
1328 }
1329
1330 for (name, value) in mods.add {
1332 upstream_request.append_header(name, value).ok();
1333 }
1334
1335 for name in &mods.remove {
1337 upstream_request.remove_header(name);
1338 }
1339
1340 trace!(
1341 correlation_id = %ctx.trace_id,
1342 "Applied request header modifications"
1343 );
1344 }
1345
1346 upstream_request.remove_header("X-Internal-Token");
1348 upstream_request.remove_header("Authorization-Internal");
1349
1350 if let Some(ref route_config) = ctx.route_config {
1353 if let Some(ref shadow_config) = route_config.shadow {
1354 let pools_snapshot = self.upstream_pools.snapshot().await;
1356 let upstream_pools = std::sync::Arc::new(pools_snapshot);
1357
1358 let route_id = ctx.route_id.clone().unwrap_or_else(|| "unknown".to_string());
1360
1361 let shadow_manager = crate::shadow::ShadowManager::new(
1363 upstream_pools,
1364 shadow_config.clone(),
1365 Some(std::sync::Arc::clone(&self.metrics)),
1366 route_id,
1367 );
1368
1369 if shadow_manager.should_shadow(upstream_request) {
1371 trace!(
1372 correlation_id = %ctx.trace_id,
1373 shadow_upstream = %shadow_config.upstream,
1374 percentage = shadow_config.percentage,
1375 "Shadowing request"
1376 );
1377
1378 let shadow_headers = upstream_request.clone();
1380
1381 let shadow_ctx = crate::upstream::RequestContext {
1383 client_ip: ctx.client_ip.parse().ok(),
1384 headers: std::collections::HashMap::new(), path: ctx.path.clone(),
1386 method: ctx.method.clone(),
1387 };
1388
1389 let buffer_body = shadow_config.buffer_body
1391 && crate::shadow::should_buffer_method(&ctx.method);
1392
1393 let body = None;
1397
1398 shadow_manager.shadow_request(shadow_headers, body, shadow_ctx);
1400 }
1401 }
1402 }
1403
1404 Ok(())
1405 }
1406
1407 fn response_body_filter(
1413 &self,
1414 _session: &mut Session,
1415 body: &mut Option<Bytes>,
1416 end_of_stream: bool,
1417 ctx: &mut Self::CTX,
1418 ) -> Result<Option<Duration>, Box<Error>> {
1419 if ctx.is_websocket_upgrade {
1422 if let Some(ref handler) = ctx.websocket_handler {
1423 let handler = handler.clone();
1424 let data = body.take();
1425
1426 let result = tokio::task::block_in_place(|| {
1429 tokio::runtime::Handle::current()
1430 .block_on(async { handler.process_server_data(data).await })
1431 });
1432
1433 match result {
1434 crate::websocket::ProcessResult::Forward(data) => {
1435 *body = data;
1436 }
1437 crate::websocket::ProcessResult::Close(reason) => {
1438 warn!(
1439 correlation_id = %ctx.trace_id,
1440 code = reason.code,
1441 reason = %reason.reason,
1442 "WebSocket connection closed by agent (server->client)"
1443 );
1444 let close_frame =
1447 crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
1448 let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
1449 if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
1450 *body = Some(Bytes::from(encoded));
1451 }
1452 }
1453 }
1454 }
1455 return Ok(None);
1457 }
1458
1459 if let Some(ref chunk) = body {
1461 ctx.response_bytes += chunk.len() as u64;
1462
1463 trace!(
1464 correlation_id = %ctx.trace_id,
1465 chunk_size = chunk.len(),
1466 total_response_bytes = ctx.response_bytes,
1467 end_of_stream = end_of_stream,
1468 "Processing response body chunk"
1469 );
1470
1471 if ctx.response_body_inspection_enabled
1475 && !ctx.response_body_inspection_agents.is_empty()
1476 {
1477 let config = ctx
1478 .config
1479 .get_or_insert_with(|| self.config_manager.current());
1480 let max_inspection_bytes = config
1481 .waf
1482 .as_ref()
1483 .map(|w| w.body_inspection.max_inspection_bytes as u64)
1484 .unwrap_or(1024 * 1024);
1485
1486 if ctx.response_body_bytes_inspected < max_inspection_bytes {
1487 let bytes_to_inspect = std::cmp::min(
1488 chunk.len() as u64,
1489 max_inspection_bytes - ctx.response_body_bytes_inspected,
1490 ) as usize;
1491
1492 ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
1496 ctx.response_body_chunk_index += 1;
1497
1498 trace!(
1499 correlation_id = %ctx.trace_id,
1500 bytes_inspected = ctx.response_body_bytes_inspected,
1501 max_inspection_bytes = max_inspection_bytes,
1502 chunk_index = ctx.response_body_chunk_index,
1503 "Tracking response body for inspection"
1504 );
1505 }
1506 }
1507 }
1508
1509 if end_of_stream {
1510 trace!(
1511 correlation_id = %ctx.trace_id,
1512 total_response_bytes = ctx.response_bytes,
1513 response_bytes_inspected = ctx.response_body_bytes_inspected,
1514 "Response body complete"
1515 );
1516 }
1517
1518 Ok(None)
1520 }
1521
1522 async fn connected_to_upstream(
1525 &self,
1526 _session: &mut Session,
1527 reused: bool,
1528 peer: &HttpPeer,
1529 #[cfg(unix)] _fd: RawFd,
1530 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
1531 digest: Option<&Digest>,
1532 ctx: &mut Self::CTX,
1533 ) -> Result<(), Box<Error>> {
1534 ctx.connection_reused = reused;
1536
1537 if reused {
1539 trace!(
1540 correlation_id = %ctx.trace_id,
1541 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1542 peer_address = %peer.address(),
1543 "Reusing existing upstream connection"
1544 );
1545 } else {
1546 debug!(
1547 correlation_id = %ctx.trace_id,
1548 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1549 peer_address = %peer.address(),
1550 ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
1551 "Established new upstream connection"
1552 );
1553 }
1554
1555 Ok(())
1556 }
1557
1558 fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
1568 let route_id = match ctx.route_id.as_deref() {
1570 Some(id) => id,
1571 None => {
1572 trace!(
1573 correlation_id = %ctx.trace_id,
1574 "Cache filter: no route ID, skipping cache"
1575 );
1576 return Ok(());
1577 }
1578 };
1579
1580 if !self.cache_manager.is_enabled(route_id) {
1582 trace!(
1583 correlation_id = %ctx.trace_id,
1584 route_id = %route_id,
1585 "Cache disabled for route"
1586 );
1587 return Ok(());
1588 }
1589
1590 if !self
1592 .cache_manager
1593 .is_method_cacheable(route_id, &ctx.method)
1594 {
1595 trace!(
1596 correlation_id = %ctx.trace_id,
1597 route_id = %route_id,
1598 method = %ctx.method,
1599 "Method not cacheable"
1600 );
1601 return Ok(());
1602 }
1603
1604 debug!(
1606 correlation_id = %ctx.trace_id,
1607 route_id = %route_id,
1608 method = %ctx.method,
1609 path = %ctx.path,
1610 "Enabling HTTP caching for request"
1611 );
1612
1613 let storage = get_cache_storage();
1615 let eviction = get_cache_eviction();
1616 let cache_lock = get_cache_lock();
1617
1618 session.cache.enable(
1620 storage,
1621 Some(eviction),
1622 None, Some(cache_lock),
1624 None, );
1626
1627 ctx.cache_eligible = true;
1629
1630 trace!(
1631 correlation_id = %ctx.trace_id,
1632 route_id = %route_id,
1633 cache_enabled = session.cache.enabled(),
1634 "Cache enabled for request"
1635 );
1636
1637 Ok(())
1638 }
1639
1640 fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
1645 let req_header = session.req_header();
1646 let method = req_header.method.as_str();
1647 let path = req_header.uri.path();
1648 let host = ctx.host.as_deref().unwrap_or("unknown");
1649 let query = req_header.uri.query();
1650
1651 let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1653
1654 trace!(
1655 correlation_id = %ctx.trace_id,
1656 cache_key = %key_string,
1657 "Generated cache key"
1658 );
1659
1660 Ok(CacheKey::default(req_header))
1663 }
1664
1665 fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
1670 session.cache.cache_miss();
1672
1673 if let Some(route_id) = ctx.route_id.as_deref() {
1675 self.cache_manager.stats().record_miss();
1676
1677 trace!(
1678 correlation_id = %ctx.trace_id,
1679 route_id = %route_id,
1680 path = %ctx.path,
1681 "Cache miss"
1682 );
1683 }
1684 }
1685
1686 async fn cache_hit_filter(
1692 &self,
1693 session: &mut Session,
1694 meta: &CacheMeta,
1695 _hit_handler: &mut HitHandler,
1696 is_fresh: bool,
1697 ctx: &mut Self::CTX,
1698 ) -> Result<Option<ForcedInvalidationKind>>
1699 where
1700 Self::CTX: Send + Sync,
1701 {
1702 let req_header = session.req_header();
1704 let method = req_header.method.as_str();
1705 let path = req_header.uri.path();
1706 let host = req_header.uri.host().unwrap_or("localhost");
1707 let query = req_header.uri.query();
1708
1709 let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1711
1712 if self.cache_manager.should_invalidate(&cache_key) {
1714 info!(
1715 correlation_id = %ctx.trace_id,
1716 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1717 cache_key = %cache_key,
1718 "Cache entry invalidated by purge request"
1719 );
1720 return Ok(Some(ForcedInvalidationKind::ForceExpired));
1722 }
1723
1724 if is_fresh {
1726 self.cache_manager.stats().record_hit();
1727
1728 debug!(
1729 correlation_id = %ctx.trace_id,
1730 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1731 is_fresh = is_fresh,
1732 "Cache hit (fresh)"
1733 );
1734 } else {
1735 trace!(
1736 correlation_id = %ctx.trace_id,
1737 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1738 is_fresh = is_fresh,
1739 "Cache hit (stale)"
1740 );
1741 }
1742
1743 Ok(None)
1745 }
1746
1747 fn response_cache_filter(
1752 &self,
1753 _session: &Session,
1754 resp: &ResponseHeader,
1755 ctx: &mut Self::CTX,
1756 ) -> Result<RespCacheable> {
1757 let route_id = match ctx.route_id.as_deref() {
1758 Some(id) => id,
1759 None => {
1760 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1761 "no_route",
1762 )));
1763 }
1764 };
1765
1766 if !self.cache_manager.is_enabled(route_id) {
1768 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1769 "disabled",
1770 )));
1771 }
1772
1773 let status = resp.status.as_u16();
1774
1775 if !self.cache_manager.is_status_cacheable(route_id, status) {
1777 trace!(
1778 correlation_id = %ctx.trace_id,
1779 route_id = %route_id,
1780 status = status,
1781 "Status code not cacheable"
1782 );
1783 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1784 }
1785
1786 if let Some(cache_control) = resp.headers.get("cache-control") {
1788 if let Ok(cc_str) = cache_control.to_str() {
1789 if crate::cache::CacheManager::is_no_cache(cc_str) {
1790 trace!(
1791 correlation_id = %ctx.trace_id,
1792 route_id = %route_id,
1793 cache_control = %cc_str,
1794 "Response has no-cache directive"
1795 );
1796 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1797 }
1798 }
1799 }
1800
1801 let cache_control = resp
1803 .headers
1804 .get("cache-control")
1805 .and_then(|v| v.to_str().ok());
1806 let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
1807
1808 if ttl.is_zero() {
1809 trace!(
1810 correlation_id = %ctx.trace_id,
1811 route_id = %route_id,
1812 "TTL is zero, not caching"
1813 );
1814 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1815 }
1816
1817 let config = self
1819 .cache_manager
1820 .get_route_config(route_id)
1821 .unwrap_or_default();
1822
1823 let now = std::time::SystemTime::now();
1825 let fresh_until = now + ttl;
1826
1827 let header = resp.clone();
1829
1830 let cache_meta = CacheMeta::new(
1832 fresh_until,
1833 now,
1834 config.stale_while_revalidate_secs as u32,
1835 config.stale_if_error_secs as u32,
1836 header,
1837 );
1838
1839 self.cache_manager.stats().record_store();
1841
1842 debug!(
1843 correlation_id = %ctx.trace_id,
1844 route_id = %route_id,
1845 status = status,
1846 ttl_secs = ttl.as_secs(),
1847 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1848 stale_if_error_secs = config.stale_if_error_secs,
1849 "Caching response"
1850 );
1851
1852 Ok(RespCacheable::Cacheable(cache_meta))
1853 }
1854
1855 fn should_serve_stale(
1859 &self,
1860 _session: &mut Session,
1861 ctx: &mut Self::CTX,
1862 error: Option<&Error>,
1863 ) -> bool {
1864 let route_id = match ctx.route_id.as_deref() {
1865 Some(id) => id,
1866 None => return false,
1867 };
1868
1869 let config = match self.cache_manager.get_route_config(route_id) {
1871 Some(c) => c,
1872 None => return false,
1873 };
1874
1875 if let Some(e) = error {
1877 if e.esource() == &pingora::ErrorSource::Upstream {
1879 debug!(
1880 correlation_id = %ctx.trace_id,
1881 route_id = %route_id,
1882 error = %e,
1883 stale_if_error_secs = config.stale_if_error_secs,
1884 "Considering stale-if-error"
1885 );
1886 return config.stale_if_error_secs > 0;
1887 }
1888 }
1889
1890 if error.is_none() && config.stale_while_revalidate_secs > 0 {
1892 trace!(
1893 correlation_id = %ctx.trace_id,
1894 route_id = %route_id,
1895 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1896 "Allowing stale-while-revalidate"
1897 );
1898 return true;
1899 }
1900
1901 false
1902 }
1903
1904 fn range_header_filter(
1914 &self,
1915 session: &mut Session,
1916 response: &mut ResponseHeader,
1917 ctx: &mut Self::CTX,
1918 ) -> pingora_proxy::RangeType
1919 where
1920 Self::CTX: Send + Sync,
1921 {
1922 let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
1924 matches!(
1926 config.service_type,
1927 sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
1928 )
1929 });
1930
1931 if !supports_range {
1932 trace!(
1933 correlation_id = %ctx.trace_id,
1934 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1935 "Range request not supported for this route type"
1936 );
1937 return pingora_proxy::RangeType::None;
1938 }
1939
1940 let range_type = pingora_proxy::range_header_filter(session.req_header(), response);
1942
1943 match &range_type {
1944 pingora_proxy::RangeType::None => {
1945 trace!(
1946 correlation_id = %ctx.trace_id,
1947 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1948 "No range request or not applicable"
1949 );
1950 }
1951 pingora_proxy::RangeType::Single(range) => {
1952 trace!(
1953 correlation_id = %ctx.trace_id,
1954 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1955 range_start = range.start,
1956 range_end = range.end,
1957 "Processing single-range request"
1958 );
1959 }
1960 pingora_proxy::RangeType::Multi(multi) => {
1961 trace!(
1962 correlation_id = %ctx.trace_id,
1963 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1964 range_count = multi.ranges.len(),
1965 "Processing multi-range request"
1966 );
1967 }
1968 pingora_proxy::RangeType::Invalid => {
1969 debug!(
1970 correlation_id = %ctx.trace_id,
1971 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1972 "Invalid range header"
1973 );
1974 }
1975 }
1976
1977 range_type
1978 }
1979
1980 async fn fail_to_proxy(
1983 &self,
1984 session: &mut Session,
1985 e: &Error,
1986 ctx: &mut Self::CTX,
1987 ) -> pingora_proxy::FailToProxy
1988 where
1989 Self::CTX: Send + Sync,
1990 {
1991 let error_code = match e.etype() {
1992 ErrorType::ConnectRefused => 503,
1994 ErrorType::ConnectTimedout => 504,
1995 ErrorType::ConnectNoRoute => 502,
1996
1997 ErrorType::ReadTimedout => 504,
1999 ErrorType::WriteTimedout => 504,
2000
2001 ErrorType::TLSHandshakeFailure => 502,
2003 ErrorType::InvalidCert => 502,
2004
2005 ErrorType::InvalidHTTPHeader => 400,
2007 ErrorType::H2Error => 502,
2008
2009 ErrorType::ConnectProxyFailure => 502,
2011 ErrorType::ConnectionClosed => 502,
2012
2013 ErrorType::HTTPStatus(status) => *status,
2015
2016 ErrorType::InternalError => {
2018 let error_str = e.to_string();
2020 if error_str.contains("upstream")
2021 || error_str.contains("DNS")
2022 || error_str.contains("resolve")
2023 {
2024 502
2025 } else {
2026 500
2027 }
2028 }
2029
2030 _ => 502,
2032 };
2033
2034 error!(
2035 correlation_id = %ctx.trace_id,
2036 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2037 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2038 error_type = ?e.etype(),
2039 error = %e,
2040 error_code = error_code,
2041 "Proxy error occurred"
2042 );
2043
2044 self.metrics
2046 .record_blocked_request(&format!("proxy_error_{}", error_code));
2047
2048 let error_message = match error_code {
2052 400 => "Bad Request",
2053 502 => "Bad Gateway",
2054 503 => "Service Unavailable",
2055 504 => "Gateway Timeout",
2056 _ => "Internal Server Error",
2057 };
2058
2059 let body = format!(
2061 r#"{{"error":"{} {}","trace_id":"{}"}}"#,
2062 error_code, error_message, ctx.trace_id
2063 );
2064
2065 let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
2067 header
2068 .insert_header("Content-Type", "application/json")
2069 .ok();
2070 header
2071 .insert_header("Content-Length", body.len().to_string())
2072 .ok();
2073 header
2074 .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
2075 .ok();
2076 header.insert_header("Connection", "close").ok();
2077
2078 if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
2080 warn!(
2081 correlation_id = %ctx.trace_id,
2082 error = %write_err,
2083 "Failed to write error response header"
2084 );
2085 } else {
2086 if let Err(write_err) = session
2088 .write_response_body(Some(bytes::Bytes::from(body)), true)
2089 .await
2090 {
2091 warn!(
2092 correlation_id = %ctx.trace_id,
2093 error = %write_err,
2094 "Failed to write error response body"
2095 );
2096 }
2097 }
2098
2099 pingora_proxy::FailToProxy {
2102 error_code,
2103 can_reuse_downstream: false,
2104 }
2105 }
2106
2107 fn error_while_proxy(
2113 &self,
2114 peer: &HttpPeer,
2115 session: &mut Session,
2116 e: Box<Error>,
2117 ctx: &mut Self::CTX,
2118 client_reused: bool,
2119 ) -> Box<Error> {
2120 let error_type = e.etype().clone();
2121 let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
2122
2123 let is_retryable = matches!(
2125 error_type,
2126 ErrorType::ConnectTimedout
2127 | ErrorType::ReadTimedout
2128 | ErrorType::WriteTimedout
2129 | ErrorType::ConnectionClosed
2130 | ErrorType::ConnectRefused
2131 );
2132
2133 warn!(
2135 correlation_id = %ctx.trace_id,
2136 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2137 upstream = %upstream_id,
2138 peer_address = %peer.address(),
2139 error_type = ?error_type,
2140 error = %e,
2141 client_reused = client_reused,
2142 is_retryable = is_retryable,
2143 "Error during proxy operation"
2144 );
2145
2146 let peer_address = peer.address().to_string();
2149 let upstream_pools = self.upstream_pools.clone();
2150 let upstream_id_owned = upstream_id.to_string();
2151 tokio::spawn(async move {
2152 if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
2153 pool.report_result(&peer_address, false).await;
2154 }
2155 });
2156
2157 self.metrics
2159 .record_blocked_request(&format!("proxy_error_{:?}", error_type));
2160
2161 let mut enhanced_error = e.more_context(format!(
2163 "Upstream: {}, Peer: {}, Attempts: {}",
2164 upstream_id,
2165 peer.address(),
2166 ctx.upstream_attempts
2167 ));
2168
2169 if is_retryable {
2174 let can_retry = if client_reused {
2175 !session.as_ref().retry_buffer_truncated()
2177 } else {
2178 true
2180 };
2181
2182 enhanced_error.retry.decide_reuse(can_retry);
2183
2184 if can_retry {
2185 debug!(
2186 correlation_id = %ctx.trace_id,
2187 upstream = %upstream_id,
2188 error_type = ?error_type,
2189 "Error is retryable, will attempt retry"
2190 );
2191 }
2192 } else {
2193 enhanced_error.retry.decide_reuse(false);
2195 }
2196
2197 enhanced_error
2198 }
2199
2200 async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
2201 self.reload_coordinator.dec_requests();
2203
2204 let duration = ctx.elapsed();
2205
2206 let status = session
2208 .response_written()
2209 .map(|r| r.status.as_u16())
2210 .unwrap_or(0);
2211
2212 if let (Some(ref peer_addr), Some(ref upstream_id)) =
2215 (&ctx.selected_upstream_address, &ctx.upstream)
2216 {
2217 let success = status > 0 && status < 500;
2219
2220 if let Some(pool) = self.upstream_pools.get(upstream_id).await {
2221 pool.report_result_with_latency(peer_addr, success, Some(duration))
2222 .await;
2223 trace!(
2224 correlation_id = %ctx.trace_id,
2225 upstream = %upstream_id,
2226 peer_address = %peer_addr,
2227 success = success,
2228 duration_ms = duration.as_millis(),
2229 status = status,
2230 "Reported result to adaptive load balancer"
2231 );
2232 }
2233 }
2234
2235 if self.log_manager.access_log_enabled() {
2237 let access_entry = AccessLogEntry {
2238 timestamp: chrono::Utc::now().to_rfc3339(),
2239 trace_id: ctx.trace_id.clone(),
2240 method: ctx.method.clone(),
2241 path: ctx.path.clone(),
2242 query: ctx.query.clone(),
2243 protocol: "HTTP/1.1".to_string(),
2244 status,
2245 body_bytes: ctx.response_bytes,
2246 duration_ms: duration.as_millis() as u64,
2247 client_ip: ctx.client_ip.clone(),
2248 user_agent: ctx.user_agent.clone(),
2249 referer: ctx.referer.clone(),
2250 host: ctx.host.clone(),
2251 route_id: ctx.route_id.clone(),
2252 upstream: ctx.upstream.clone(),
2253 upstream_attempts: ctx.upstream_attempts,
2254 instance_id: self.app_state.instance_id.clone(),
2255 namespace: ctx.namespace.clone(),
2256 service: ctx.service.clone(),
2257 body_bytes_sent: ctx.response_bytes,
2259 upstream_addr: ctx.selected_upstream_address.clone(),
2260 connection_reused: ctx.connection_reused,
2261 rate_limit_hit: status == 429,
2262 geo_country: None, };
2264 self.log_manager.log_access(&access_entry);
2265 }
2266
2267 if tracing::enabled!(tracing::Level::DEBUG) {
2269 debug!(
2270 trace_id = %ctx.trace_id,
2271 method = %ctx.method,
2272 path = %ctx.path,
2273 route_id = ?ctx.route_id,
2274 upstream = ?ctx.upstream,
2275 status = status,
2276 duration_ms = duration.as_millis() as u64,
2277 upstream_attempts = ctx.upstream_attempts,
2278 error = ?_error.map(|e| e.to_string()),
2279 "Request completed"
2280 );
2281 }
2282
2283 if ctx.is_websocket_upgrade && status == 101 {
2285 info!(
2286 trace_id = %ctx.trace_id,
2287 route_id = ?ctx.route_id,
2288 upstream = ?ctx.upstream,
2289 client_ip = %ctx.client_ip,
2290 "WebSocket connection established"
2291 );
2292 }
2293
2294 if let Some(span) = ctx.otel_span.take() {
2296 span.end();
2297 }
2298 }
2299}
2300
2301impl SentinelProxy {
2306 async fn process_body_chunk_streaming(
2308 &self,
2309 body: &mut Option<Bytes>,
2310 end_of_stream: bool,
2311 ctx: &mut RequestContext,
2312 ) -> Result<(), Box<Error>> {
2313 let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
2315 let chunk_index = ctx.request_body_chunk_index;
2316 ctx.request_body_chunk_index += 1;
2317 ctx.body_bytes_inspected += chunk_data.len() as u64;
2318
2319 debug!(
2320 correlation_id = %ctx.trace_id,
2321 chunk_index = chunk_index,
2322 chunk_size = chunk_data.len(),
2323 end_of_stream = end_of_stream,
2324 "Streaming body chunk to agents"
2325 );
2326
2327 let agent_ctx = crate::agents::AgentCallContext {
2329 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
2330 metadata: sentinel_agent_protocol::RequestMetadata {
2331 correlation_id: ctx.trace_id.clone(),
2332 request_id: ctx.trace_id.clone(),
2333 client_ip: ctx.client_ip.clone(),
2334 client_port: 0,
2335 server_name: ctx.host.clone(),
2336 protocol: "HTTP/1.1".to_string(),
2337 tls_version: None,
2338 tls_cipher: None,
2339 route_id: ctx.route_id.clone(),
2340 upstream_id: ctx.upstream.clone(),
2341 timestamp: chrono::Utc::now().to_rfc3339(),
2342 traceparent: ctx.traceparent(),
2343 },
2344 route_id: ctx.route_id.clone(),
2345 upstream_id: ctx.upstream.clone(),
2346 request_body: None, response_body: None,
2348 };
2349
2350 let agent_ids = ctx.body_inspection_agents.clone();
2351 let total_size = None; match self
2354 .agent_manager
2355 .process_request_body_streaming(
2356 &agent_ctx,
2357 &chunk_data,
2358 end_of_stream,
2359 chunk_index,
2360 ctx.body_bytes_inspected as usize,
2361 total_size,
2362 &agent_ids,
2363 )
2364 .await
2365 {
2366 Ok(decision) => {
2367 ctx.agent_needs_more = decision.needs_more;
2369
2370 if let Some(ref mutation) = decision.request_body_mutation {
2372 if !mutation.is_pass_through() {
2373 if mutation.is_drop() {
2374 *body = None;
2376 trace!(
2377 correlation_id = %ctx.trace_id,
2378 chunk_index = chunk_index,
2379 "Agent dropped body chunk"
2380 );
2381 } else if let Some(ref new_data) = mutation.data {
2382 *body = Some(Bytes::from(new_data.clone()));
2384 trace!(
2385 correlation_id = %ctx.trace_id,
2386 chunk_index = chunk_index,
2387 original_size = chunk_data.len(),
2388 new_size = new_data.len(),
2389 "Agent mutated body chunk"
2390 );
2391 }
2392 }
2393 }
2394
2395 if !decision.needs_more && !decision.is_allow() {
2397 warn!(
2398 correlation_id = %ctx.trace_id,
2399 action = ?decision.action,
2400 "Agent blocked request body"
2401 );
2402 self.metrics.record_blocked_request("agent_body_inspection");
2403
2404 let (status, message) = match &decision.action {
2405 crate::agents::AgentAction::Block { status, body, .. } => (
2406 *status,
2407 body.clone().unwrap_or_else(|| "Blocked".to_string()),
2408 ),
2409 _ => (403, "Forbidden".to_string()),
2410 };
2411
2412 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2413 }
2414
2415 trace!(
2416 correlation_id = %ctx.trace_id,
2417 needs_more = decision.needs_more,
2418 "Agent processed body chunk"
2419 );
2420 }
2421 Err(e) => {
2422 let fail_closed = ctx
2423 .route_config
2424 .as_ref()
2425 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2426 .unwrap_or(false);
2427
2428 if fail_closed {
2429 error!(
2430 correlation_id = %ctx.trace_id,
2431 error = %e,
2432 "Agent streaming body inspection failed, blocking (fail-closed)"
2433 );
2434 return Err(Error::explain(
2435 ErrorType::HTTPStatus(503),
2436 "Service unavailable",
2437 ));
2438 } else {
2439 warn!(
2440 correlation_id = %ctx.trace_id,
2441 error = %e,
2442 "Agent streaming body inspection failed, allowing (fail-open)"
2443 );
2444 }
2445 }
2446 }
2447
2448 Ok(())
2449 }
2450
2451 async fn send_buffered_body_to_agents(
2453 &self,
2454 end_of_stream: bool,
2455 ctx: &mut RequestContext,
2456 ) -> Result<(), Box<Error>> {
2457 debug!(
2458 correlation_id = %ctx.trace_id,
2459 buffer_size = ctx.body_buffer.len(),
2460 end_of_stream = end_of_stream,
2461 agent_count = ctx.body_inspection_agents.len(),
2462 decompression_enabled = ctx.decompression_enabled,
2463 "Sending buffered body to agents for inspection"
2464 );
2465
2466 let body_for_inspection = if ctx.decompression_enabled {
2468 if let Some(ref encoding) = ctx.body_content_encoding {
2469 let config = crate::decompression::DecompressionConfig {
2470 max_ratio: ctx.max_decompression_ratio,
2471 max_output_bytes: ctx.max_decompression_bytes,
2472 };
2473
2474 match crate::decompression::decompress_body(
2475 &ctx.body_buffer,
2476 encoding,
2477 &config,
2478 ) {
2479 Ok(result) => {
2480 ctx.body_was_decompressed = true;
2481 self.metrics
2482 .record_decompression_success(encoding, result.ratio);
2483 debug!(
2484 correlation_id = %ctx.trace_id,
2485 encoding = %encoding,
2486 compressed_size = result.compressed_size,
2487 decompressed_size = result.decompressed_size,
2488 ratio = result.ratio,
2489 "Body decompressed for agent inspection"
2490 );
2491 result.data
2492 }
2493 Err(e) => {
2494 let failure_reason = match &e {
2496 crate::decompression::DecompressionError::RatioExceeded { .. } => {
2497 "ratio_exceeded"
2498 }
2499 crate::decompression::DecompressionError::SizeExceeded { .. } => {
2500 "size_exceeded"
2501 }
2502 crate::decompression::DecompressionError::InvalidData { .. } => {
2503 "invalid_data"
2504 }
2505 crate::decompression::DecompressionError::UnsupportedEncoding { .. } => {
2506 "unsupported"
2507 }
2508 crate::decompression::DecompressionError::IoError(_) => "io_error",
2509 };
2510 self.metrics
2511 .record_decompression_failure(encoding, failure_reason);
2512
2513 let fail_closed = ctx
2515 .route_config
2516 .as_ref()
2517 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2518 .unwrap_or(false);
2519
2520 if fail_closed {
2521 error!(
2522 correlation_id = %ctx.trace_id,
2523 error = %e,
2524 encoding = %encoding,
2525 "Decompression failed, blocking (fail-closed)"
2526 );
2527 return Err(Error::explain(
2528 ErrorType::HTTPStatus(400),
2529 "Invalid compressed body",
2530 ));
2531 } else {
2532 warn!(
2533 correlation_id = %ctx.trace_id,
2534 error = %e,
2535 encoding = %encoding,
2536 "Decompression failed, sending compressed body (fail-open)"
2537 );
2538 ctx.body_buffer.clone()
2539 }
2540 }
2541 }
2542 } else {
2543 ctx.body_buffer.clone()
2544 }
2545 } else {
2546 ctx.body_buffer.clone()
2547 };
2548
2549 let agent_ctx = crate::agents::AgentCallContext {
2550 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
2551 metadata: sentinel_agent_protocol::RequestMetadata {
2552 correlation_id: ctx.trace_id.clone(),
2553 request_id: ctx.trace_id.clone(),
2554 client_ip: ctx.client_ip.clone(),
2555 client_port: 0,
2556 server_name: ctx.host.clone(),
2557 protocol: "HTTP/1.1".to_string(),
2558 tls_version: None,
2559 tls_cipher: None,
2560 route_id: ctx.route_id.clone(),
2561 upstream_id: ctx.upstream.clone(),
2562 timestamp: chrono::Utc::now().to_rfc3339(),
2563 traceparent: ctx.traceparent(),
2564 },
2565 route_id: ctx.route_id.clone(),
2566 upstream_id: ctx.upstream.clone(),
2567 request_body: Some(body_for_inspection.clone()),
2568 response_body: None,
2569 };
2570
2571 let agent_ids = ctx.body_inspection_agents.clone();
2572 match self
2573 .agent_manager
2574 .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
2575 .await
2576 {
2577 Ok(decision) => {
2578 if !decision.is_allow() {
2579 warn!(
2580 correlation_id = %ctx.trace_id,
2581 action = ?decision.action,
2582 "Agent blocked request body"
2583 );
2584 self.metrics.record_blocked_request("agent_body_inspection");
2585
2586 let (status, message) = match &decision.action {
2587 crate::agents::AgentAction::Block { status, body, .. } => (
2588 *status,
2589 body.clone().unwrap_or_else(|| "Blocked".to_string()),
2590 ),
2591 _ => (403, "Forbidden".to_string()),
2592 };
2593
2594 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2595 }
2596
2597 trace!(
2598 correlation_id = %ctx.trace_id,
2599 "Agent allowed request body"
2600 );
2601 }
2602 Err(e) => {
2603 let fail_closed = ctx
2604 .route_config
2605 .as_ref()
2606 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2607 .unwrap_or(false);
2608
2609 if fail_closed {
2610 error!(
2611 correlation_id = %ctx.trace_id,
2612 error = %e,
2613 "Agent body inspection failed, blocking (fail-closed)"
2614 );
2615 return Err(Error::explain(
2616 ErrorType::HTTPStatus(503),
2617 "Service unavailable",
2618 ));
2619 } else {
2620 warn!(
2621 correlation_id = %ctx.trace_id,
2622 error = %e,
2623 "Agent body inspection failed, allowing (fail-open)"
2624 );
2625 }
2626 }
2627 }
2628
2629 Ok(())
2630 }
2631}