1use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14 CacheKey, CacheMeta, ForcedInvalidationKind, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
23use crate::rate_limit::HeaderAccessor;
24use crate::routing::RequestInfo;
25
26use super::context::RequestContext;
27use super::SentinelProxy;
28
29struct NoHeaderAccessor;
31impl HeaderAccessor for NoHeaderAccessor {
32 fn get_header(&self, _name: &str) -> Option<String> {
33 None
34 }
35}
36
37#[async_trait]
38impl ProxyHttp for SentinelProxy {
39 type CTX = RequestContext;
40
41 fn new_ctx(&self) -> Self::CTX {
42 RequestContext::new()
43 }
44
45 fn fail_to_connect(
46 &self,
47 _session: &mut Session,
48 peer: &HttpPeer,
49 ctx: &mut Self::CTX,
50 e: Box<Error>,
51 ) -> Box<Error> {
52 error!(
53 correlation_id = %ctx.trace_id,
54 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
55 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
56 peer_address = %peer.address(),
57 error = %e,
58 "Failed to connect to upstream peer"
59 );
60 e
62 }
63
64 async fn early_request_filter(
67 &self,
68 session: &mut Session,
69 ctx: &mut Self::CTX,
70 ) -> Result<(), Box<Error>> {
71 let req_header = session.req_header();
73 let method = req_header.method.as_str();
74 let path = req_header.uri.path();
75 let host = req_header
76 .headers
77 .get("host")
78 .and_then(|h| h.to_str().ok())
79 .unwrap_or("");
80
81 ctx.method = method.to_string();
82 ctx.path = path.to_string();
83 ctx.host = Some(host.to_string());
84
85 let route_match = {
87 let route_matcher = self.route_matcher.read();
88 let request_info = RequestInfo::new(method, path, host);
89 match route_matcher.match_request(&request_info) {
90 Some(m) => m,
91 None => return Ok(()), }
93 };
94
95 ctx.trace_id = self.get_trace_id(session);
96 ctx.route_id = Some(route_match.route_id.to_string());
97 ctx.route_config = Some(route_match.config.clone());
98
99 if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
101 if let Ok(s) = traceparent.to_str() {
102 ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
103 }
104 }
105
106 if let Some(tracer) = crate::otel::get_tracer() {
108 ctx.otel_span = Some(tracer.start_span(method, path, ctx.trace_context.as_ref()));
109 }
110
111 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
113 trace!(
114 correlation_id = %ctx.trace_id,
115 route_id = %route_match.route_id,
116 builtin_handler = ?route_match.config.builtin_handler,
117 "Handling builtin route in early_request_filter"
118 );
119
120 let handled = self
122 .handle_builtin_route(session, ctx, &route_match)
123 .await?;
124
125 if handled {
126 return Err(Error::explain(
128 ErrorType::InternalError,
129 "Builtin handler complete",
130 ));
131 }
132 }
133
134 Ok(())
135 }
136
137 async fn upstream_peer(
138 &self,
139 session: &mut Session,
140 ctx: &mut Self::CTX,
141 ) -> Result<Box<HttpPeer>, Box<Error>> {
142 self.reload_coordinator.inc_requests();
144
145 if ctx.config.is_none() {
147 ctx.config = Some(self.config_manager.current());
148 }
149
150 if ctx.client_ip.is_empty() {
152 ctx.client_ip = session
153 .client_addr()
154 .map(|a| a.to_string())
155 .unwrap_or_else(|| "unknown".to_string());
156 }
157
158 let req_header = session.req_header();
159
160 if ctx.method.is_empty() {
162 ctx.method = req_header.method.to_string();
163 ctx.path = req_header.uri.path().to_string();
164 ctx.query = req_header.uri.query().map(|q| q.to_string());
165 ctx.host = req_header
166 .headers
167 .get("host")
168 .and_then(|v| v.to_str().ok())
169 .map(|s| s.to_string());
170 }
171 ctx.user_agent = req_header
172 .headers
173 .get("user-agent")
174 .and_then(|v| v.to_str().ok())
175 .map(|s| s.to_string());
176 ctx.referer = req_header
177 .headers
178 .get("referer")
179 .and_then(|v| v.to_str().ok())
180 .map(|s| s.to_string());
181
182 trace!(
183 correlation_id = %ctx.trace_id,
184 client_ip = %ctx.client_ip,
185 "Request received, initializing context"
186 );
187
188 let route_match = if let Some(ref route_config) = ctx.route_config {
190 let route_id = ctx.route_id.as_deref().unwrap_or("");
191 crate::routing::RouteMatch {
192 route_id: sentinel_common::RouteId::new(route_id),
193 config: route_config.clone(),
194 }
195 } else {
196 let (match_result, route_duration) = {
198 let route_matcher = self.route_matcher.read();
199 let host = ctx.host.as_deref().unwrap_or("");
200
201 let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
203
204 if route_matcher.needs_headers() {
206 request_info = request_info
207 .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
208 }
209
210 if route_matcher.needs_query_params() {
212 request_info =
213 request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
214 }
215
216 trace!(
217 correlation_id = %ctx.trace_id,
218 method = %request_info.method,
219 path = %request_info.path,
220 host = %request_info.host,
221 "Built request info for route matching"
222 );
223
224 let route_start = std::time::Instant::now();
225 let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
226 warn!(
227 correlation_id = %ctx.trace_id,
228 method = %request_info.method,
229 path = %request_info.path,
230 host = %request_info.host,
231 "No matching route found for request"
232 );
233 Error::explain(ErrorType::InternalError, "No matching route found")
234 })?;
235 let route_duration = route_start.elapsed();
236 (route_match, route_duration)
238 };
239
240 ctx.route_id = Some(match_result.route_id.to_string());
241 ctx.route_config = Some(match_result.config.clone());
242
243 if ctx.trace_id.is_empty() {
245 ctx.trace_id = self.get_trace_id(session);
246
247 if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER)
249 {
250 if let Ok(s) = traceparent.to_str() {
251 ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
252 }
253 }
254
255 if let Some(tracer) = crate::otel::get_tracer() {
257 ctx.otel_span = Some(tracer.start_span(
258 &ctx.method,
259 &ctx.path,
260 ctx.trace_context.as_ref(),
261 ));
262 }
263 }
264
265 trace!(
266 correlation_id = %ctx.trace_id,
267 route_id = %match_result.route_id,
268 route_duration_us = route_duration.as_micros(),
269 service_type = ?match_result.config.service_type,
270 "Route matched"
271 );
272 match_result
273 };
274
275 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
277 trace!(
278 correlation_id = %ctx.trace_id,
279 route_id = %route_match.route_id,
280 builtin_handler = ?route_match.config.builtin_handler,
281 "Route type is builtin, skipping upstream"
282 );
283 ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
285 return Err(Error::explain(
287 ErrorType::InternalError,
288 "Builtin handler handled in request_filter",
289 ));
290 }
291
292 if route_match.config.service_type == sentinel_config::ServiceType::Static {
294 trace!(
295 correlation_id = %ctx.trace_id,
296 route_id = %route_match.route_id,
297 "Route type is static, checking for static server"
298 );
299 if self
301 .static_servers
302 .get(route_match.route_id.as_str())
303 .await
304 .is_some()
305 {
306 ctx.upstream = Some(format!("_static_{}", route_match.route_id));
308 info!(
309 correlation_id = %ctx.trace_id,
310 route_id = %route_match.route_id,
311 path = %ctx.path,
312 "Serving static file"
313 );
314 return Err(Error::explain(
316 ErrorType::InternalError,
317 "Static file serving handled in request_filter",
318 ));
319 }
320 }
321
322 if let Some(ref upstream) = route_match.config.upstream {
324 ctx.upstream = Some(upstream.clone());
325 trace!(
326 correlation_id = %ctx.trace_id,
327 route_id = %route_match.route_id,
328 upstream = %upstream,
329 "Upstream configured for route"
330 );
331 } else {
332 error!(
333 correlation_id = %ctx.trace_id,
334 route_id = %route_match.route_id,
335 "Route has no upstream configured"
336 );
337 return Err(Error::explain(
338 ErrorType::InternalError,
339 format!(
340 "Route '{}' has no upstream configured",
341 route_match.route_id
342 ),
343 ));
344 }
345
346 debug!(
347 correlation_id = %ctx.trace_id,
348 route_id = %route_match.route_id,
349 upstream = ?ctx.upstream,
350 method = %req_header.method,
351 path = %req_header.uri.path(),
352 host = ctx.host.as_deref().unwrap_or("-"),
353 client_ip = %ctx.client_ip,
354 "Processing request"
355 );
356
357 if ctx
359 .upstream
360 .as_ref()
361 .is_some_and(|u| u.starts_with("_static_"))
362 {
363 return Err(Error::explain(
365 ErrorType::InternalError,
366 "Static route should be handled in request_filter",
367 ));
368 }
369
370 let upstream_name = ctx
371 .upstream
372 .as_ref()
373 .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
374
375 trace!(
376 correlation_id = %ctx.trace_id,
377 upstream = %upstream_name,
378 "Looking up upstream pool"
379 );
380
381 let pool = self
382 .upstream_pools
383 .get(upstream_name)
384 .await
385 .ok_or_else(|| {
386 error!(
387 correlation_id = %ctx.trace_id,
388 upstream = %upstream_name,
389 "Upstream pool not found"
390 );
391 Error::explain(
392 ErrorType::InternalError,
393 format!("Upstream pool '{}' not found", upstream_name),
394 )
395 })?;
396
397 let max_retries = route_match
399 .config
400 .retry_policy
401 .as_ref()
402 .map(|r| r.max_attempts)
403 .unwrap_or(1);
404
405 trace!(
406 correlation_id = %ctx.trace_id,
407 upstream = %upstream_name,
408 max_retries = max_retries,
409 "Starting upstream peer selection"
410 );
411
412 let mut last_error = None;
413 let selection_start = std::time::Instant::now();
414
415 for attempt in 1..=max_retries {
416 ctx.upstream_attempts = attempt;
417
418 trace!(
419 correlation_id = %ctx.trace_id,
420 upstream = %upstream_name,
421 attempt = attempt,
422 max_retries = max_retries,
423 "Attempting to select upstream peer"
424 );
425
426 match pool.select_peer(None).await {
427 Ok(peer) => {
428 let selection_duration = selection_start.elapsed();
429 let peer_addr = peer.address().to_string();
431 ctx.selected_upstream_address = Some(peer_addr.clone());
432 debug!(
433 correlation_id = %ctx.trace_id,
434 upstream = %upstream_name,
435 peer_address = %peer_addr,
436 attempt = attempt,
437 selection_duration_us = selection_duration.as_micros(),
438 "Selected upstream peer"
439 );
440 return Ok(Box::new(peer));
441 }
442 Err(e) => {
443 warn!(
444 correlation_id = %ctx.trace_id,
445 upstream = %upstream_name,
446 attempt = attempt,
447 max_retries = max_retries,
448 error = %e,
449 "Failed to select upstream peer"
450 );
451 last_error = Some(e);
452
453 if attempt < max_retries {
454 let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
456 trace!(
457 correlation_id = %ctx.trace_id,
458 backoff_ms = backoff.as_millis(),
459 "Backing off before retry"
460 );
461 sleep(backoff).await;
462 }
463 }
464 }
465 }
466
467 let selection_duration = selection_start.elapsed();
468 error!(
469 correlation_id = %ctx.trace_id,
470 upstream = %upstream_name,
471 attempts = max_retries,
472 selection_duration_ms = selection_duration.as_millis(),
473 last_error = ?last_error,
474 "All upstream selection attempts failed"
475 );
476
477 Err(Error::explain(
478 ErrorType::InternalError,
479 format!("All upstream attempts failed: {:?}", last_error),
480 ))
481 }
482
483 async fn request_filter(
484 &self,
485 session: &mut Session,
486 ctx: &mut Self::CTX,
487 ) -> Result<bool, Box<Error>> {
488 trace!(
489 correlation_id = %ctx.trace_id,
490 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
491 "Starting request filter phase"
492 );
493
494 if let Some(route_id) = ctx.route_id.as_deref() {
497 if self.rate_limit_manager.has_route_limiter(route_id) {
498 let rate_result = self.rate_limit_manager.check(
499 route_id,
500 &ctx.client_ip,
501 &ctx.path,
502 Option::<&NoHeaderAccessor>::None,
503 );
504
505 if rate_result.limit > 0 {
507 ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
508 limit: rate_result.limit,
509 remaining: rate_result.remaining,
510 reset_at: rate_result.reset_at,
511 });
512 }
513
514 if !rate_result.allowed {
515 use sentinel_config::RateLimitAction;
516
517 match rate_result.action {
518 RateLimitAction::Reject => {
519 warn!(
520 correlation_id = %ctx.trace_id,
521 route_id = route_id,
522 client_ip = %ctx.client_ip,
523 limiter = %rate_result.limiter,
524 limit = rate_result.limit,
525 remaining = rate_result.remaining,
526 "Request rate limited"
527 );
528 self.metrics.record_blocked_request("rate_limited");
529
530 let audit_entry = AuditLogEntry::rate_limited(
532 &ctx.trace_id,
533 &ctx.method,
534 &ctx.path,
535 &ctx.client_ip,
536 &rate_result.limiter,
537 )
538 .with_route_id(route_id)
539 .with_status_code(rate_result.status_code);
540 self.log_manager.log_audit(&audit_entry);
541
542 let body = rate_result
544 .message
545 .unwrap_or_else(|| "Rate limit exceeded".to_string());
546
547 let retry_after = rate_result.reset_at.saturating_sub(
549 std::time::SystemTime::now()
550 .duration_since(std::time::UNIX_EPOCH)
551 .unwrap_or_default()
552 .as_secs(),
553 );
554 crate::http_helpers::write_rate_limit_error(
555 session,
556 rate_result.status_code,
557 &body,
558 rate_result.limit,
559 rate_result.remaining,
560 rate_result.reset_at,
561 retry_after,
562 )
563 .await?;
564 return Ok(true); }
566 RateLimitAction::LogOnly => {
567 debug!(
568 correlation_id = %ctx.trace_id,
569 route_id = route_id,
570 "Rate limit exceeded (log only mode)"
571 );
572 }
574 RateLimitAction::Delay => {
575 if let Some(delay_ms) = rate_result.suggested_delay_ms {
577 let actual_delay = delay_ms.min(rate_result.max_delay_ms);
579
580 if actual_delay > 0 {
581 debug!(
582 correlation_id = %ctx.trace_id,
583 route_id = route_id,
584 suggested_delay_ms = delay_ms,
585 max_delay_ms = rate_result.max_delay_ms,
586 actual_delay_ms = actual_delay,
587 "Applying rate limit delay"
588 );
589
590 tokio::time::sleep(std::time::Duration::from_millis(
591 actual_delay,
592 ))
593 .await;
594 }
595 }
596 }
598 }
599 }
600 }
601 }
602
603 if let Some(route_id) = ctx.route_id.as_deref() {
605 if let Some(ref route_config) = ctx.route_config {
606 for filter_id in &route_config.filters {
607 if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
608 ctx.geo_country_code = result.country_code.clone();
610 ctx.geo_lookup_performed = true;
611
612 if !result.allowed {
613 warn!(
614 correlation_id = %ctx.trace_id,
615 route_id = route_id,
616 client_ip = %ctx.client_ip,
617 country = ?result.country_code,
618 filter_id = %filter_id,
619 "Request blocked by geo filter"
620 );
621 self.metrics.record_blocked_request("geo_blocked");
622
623 let audit_entry = AuditLogEntry::new(
625 &ctx.trace_id,
626 AuditEventType::Blocked,
627 &ctx.method,
628 &ctx.path,
629 &ctx.client_ip,
630 )
631 .with_route_id(route_id)
632 .with_status_code(result.status_code)
633 .with_reason(format!(
634 "Geo blocked: country={}, filter={}",
635 result.country_code.as_deref().unwrap_or("unknown"),
636 filter_id
637 ));
638 self.log_manager.log_audit(&audit_entry);
639
640 let body = result
642 .block_message
643 .unwrap_or_else(|| "Access denied".to_string());
644
645 crate::http_helpers::write_error(
646 session,
647 result.status_code,
648 &body,
649 "text/plain",
650 )
651 .await?;
652 return Ok(true); }
654
655 break;
657 }
658 }
659 }
660 }
661
662 let is_websocket_upgrade = session
664 .req_header()
665 .headers
666 .get(http::header::UPGRADE)
667 .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
668 .unwrap_or(false);
669
670 if is_websocket_upgrade {
671 ctx.is_websocket_upgrade = true;
672
673 if let Some(ref route_config) = ctx.route_config {
675 if !route_config.websocket {
676 warn!(
677 correlation_id = %ctx.trace_id,
678 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
679 client_ip = %ctx.client_ip,
680 "WebSocket upgrade rejected: not enabled for route"
681 );
682
683 self.metrics.record_blocked_request("websocket_not_enabled");
684
685 let audit_entry = AuditLogEntry::new(
687 &ctx.trace_id,
688 AuditEventType::Blocked,
689 &ctx.method,
690 &ctx.path,
691 &ctx.client_ip,
692 )
693 .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
694 .with_action("websocket_rejected")
695 .with_reason("WebSocket not enabled for route");
696 self.log_manager.log_audit(&audit_entry);
697
698 crate::http_helpers::write_error(
700 session,
701 403,
702 "WebSocket not enabled for this route",
703 "text/plain",
704 )
705 .await?;
706 return Ok(true); }
708
709 debug!(
710 correlation_id = %ctx.trace_id,
711 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
712 "WebSocket upgrade request allowed"
713 );
714
715 if route_config.websocket_inspection {
717 let has_compression = session
719 .req_header()
720 .headers
721 .get("Sec-WebSocket-Extensions")
722 .and_then(|v| v.to_str().ok())
723 .map(|s| s.contains("permessage-deflate"))
724 .unwrap_or(false);
725
726 if has_compression {
727 debug!(
728 correlation_id = %ctx.trace_id,
729 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
730 "WebSocket inspection skipped: permessage-deflate negotiated"
731 );
732 ctx.websocket_skip_inspection = true;
733 } else {
734 ctx.websocket_inspection_enabled = true;
735
736 ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
738 sentinel_agent_protocol::EventType::WebSocketFrame,
739 );
740
741 debug!(
742 correlation_id = %ctx.trace_id,
743 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
744 agent_count = ctx.websocket_inspection_agents.len(),
745 "WebSocket frame inspection enabled"
746 );
747 }
748 }
749 }
750 }
751
752 if let Some(route_config) = ctx.route_config.clone() {
755 if route_config.service_type == sentinel_config::ServiceType::Static {
756 trace!(
757 correlation_id = %ctx.trace_id,
758 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
759 "Handling static file route"
760 );
761 let route_match = crate::routing::RouteMatch {
763 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
764 config: route_config.clone(),
765 };
766 return self.handle_static_route(session, ctx, &route_match).await;
767 } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
768 trace!(
769 correlation_id = %ctx.trace_id,
770 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
771 builtin_handler = ?route_config.builtin_handler,
772 "Handling builtin route"
773 );
774 let route_match = crate::routing::RouteMatch {
776 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
777 config: route_config.clone(),
778 };
779 return self.handle_builtin_route(session, ctx, &route_match).await;
780 }
781 }
782
783 if let Some(route_id) = ctx.route_id.clone() {
785 if let Some(validator) = self.validators.get(&route_id).await {
786 trace!(
787 correlation_id = %ctx.trace_id,
788 route_id = %route_id,
789 "Running API schema validation"
790 );
791 if let Some(result) = self
792 .validate_api_request(session, ctx, &route_id, &validator)
793 .await?
794 {
795 debug!(
796 correlation_id = %ctx.trace_id,
797 route_id = %route_id,
798 validation_passed = result,
799 "API validation complete"
800 );
801 return Ok(result);
802 }
803 }
804 }
805
806 let client_addr = session
808 .client_addr()
809 .map(|a| format!("{}", a))
810 .unwrap_or_else(|| "unknown".to_string());
811 let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
812
813 let req_header = session.req_header_mut();
814
815 req_header
817 .insert_header("X-Correlation-Id", &ctx.trace_id)
818 .ok();
819 req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
820
821 let config = ctx
823 .config
824 .get_or_insert_with(|| self.config_manager.current());
825
826 const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; let header_count = req_header.headers.len();
831 if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
832 && header_count > config.limits.max_header_count
833 {
834 warn!(
835 correlation_id = %ctx.trace_id,
836 header_count = header_count,
837 limit = config.limits.max_header_count,
838 "Request blocked: exceeds header count limit"
839 );
840
841 self.metrics.record_blocked_request("header_count_exceeded");
842 return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
843 }
844
845 if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
847 let total_header_size: usize = req_header
848 .headers
849 .iter()
850 .map(|(k, v)| k.as_str().len() + v.len())
851 .sum();
852
853 if total_header_size > config.limits.max_header_size_bytes {
854 warn!(
855 correlation_id = %ctx.trace_id,
856 header_size = total_header_size,
857 limit = config.limits.max_header_size_bytes,
858 "Request blocked: exceeds header size limit"
859 );
860
861 self.metrics.record_blocked_request("header_size_exceeded");
862 return Err(Error::explain(
863 ErrorType::InternalError,
864 "Headers too large",
865 ));
866 }
867 }
868
869 trace!(
871 correlation_id = %ctx.trace_id,
872 "Processing request through agents"
873 );
874 if let Err(e) = self
875 .process_agents(session, ctx, &client_addr, client_port)
876 .await
877 {
878 if let ErrorType::HTTPStatus(status) = e.etype() {
881 let error_msg = e.to_string();
883 let body = error_msg
884 .split("context:")
885 .nth(1)
886 .map(|s| s.trim())
887 .unwrap_or("Request blocked");
888 debug!(
889 correlation_id = %ctx.trace_id,
890 status = status,
891 body = %body,
892 "Sending HTTP error response for agent block"
893 );
894 crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
895 return Ok(true); }
897 return Err(e);
899 }
900
901 trace!(
902 correlation_id = %ctx.trace_id,
903 "Request filter phase complete, forwarding to upstream"
904 );
905
906 Ok(false) }
908
909 async fn request_body_filter(
916 &self,
917 _session: &mut Session,
918 body: &mut Option<Bytes>,
919 end_of_stream: bool,
920 ctx: &mut Self::CTX,
921 ) -> Result<(), Box<Error>> {
922 use sentinel_config::BodyStreamingMode;
923
924 if ctx.is_websocket_upgrade {
926 if let Some(ref handler) = ctx.websocket_handler {
927 let result = handler.process_client_data(body.take()).await;
928 match result {
929 crate::websocket::ProcessResult::Forward(data) => {
930 *body = data;
931 }
932 crate::websocket::ProcessResult::Close(reason) => {
933 warn!(
934 correlation_id = %ctx.trace_id,
935 code = reason.code,
936 reason = %reason.reason,
937 "WebSocket connection closed by agent (client->server)"
938 );
939 return Err(Error::explain(
941 ErrorType::InternalError,
942 format!("WebSocket closed: {} {}", reason.code, reason.reason),
943 ));
944 }
945 }
946 }
947 return Ok(());
949 }
950
951 let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
953 if chunk_len > 0 {
954 ctx.request_body_bytes += chunk_len as u64;
955
956 trace!(
957 correlation_id = %ctx.trace_id,
958 chunk_size = chunk_len,
959 total_body_bytes = ctx.request_body_bytes,
960 end_of_stream = end_of_stream,
961 streaming_mode = ?ctx.request_body_streaming_mode,
962 "Processing request body chunk"
963 );
964
965 let config = ctx
967 .config
968 .get_or_insert_with(|| self.config_manager.current());
969 if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
970 warn!(
971 correlation_id = %ctx.trace_id,
972 body_bytes = ctx.request_body_bytes,
973 limit = config.limits.max_body_size_bytes,
974 "Request body size limit exceeded"
975 );
976 self.metrics.record_blocked_request("body_size_exceeded");
977 return Err(Error::explain(
978 ErrorType::InternalError,
979 "Request body too large",
980 ));
981 }
982 }
983
984 if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
986 let config = ctx
987 .config
988 .get_or_insert_with(|| self.config_manager.current());
989 let max_inspection_bytes = config
990 .waf
991 .as_ref()
992 .map(|w| w.body_inspection.max_inspection_bytes as u64)
993 .unwrap_or(1024 * 1024);
994
995 match ctx.request_body_streaming_mode {
996 BodyStreamingMode::Stream => {
997 if body.is_some() {
999 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1000 .await?;
1001 } else if end_of_stream && ctx.agent_needs_more {
1002 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1004 .await?;
1005 }
1006 }
1007 BodyStreamingMode::Hybrid { buffer_threshold } => {
1008 if ctx.body_bytes_inspected < buffer_threshold as u64 {
1010 if let Some(ref chunk) = body {
1012 let bytes_to_buffer = std::cmp::min(
1013 chunk.len(),
1014 (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
1015 );
1016 ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
1017 ctx.body_bytes_inspected += bytes_to_buffer as u64;
1018
1019 if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
1021 {
1022 self.send_buffered_body_to_agents(
1024 end_of_stream && chunk.len() == bytes_to_buffer,
1025 ctx,
1026 )
1027 .await?;
1028 ctx.body_buffer.clear();
1029
1030 if bytes_to_buffer < chunk.len() {
1032 let remaining = chunk.slice(bytes_to_buffer..);
1033 let mut remaining_body = Some(remaining);
1034 self.process_body_chunk_streaming(
1035 &mut remaining_body,
1036 end_of_stream,
1037 ctx,
1038 )
1039 .await?;
1040 }
1041 }
1042 }
1043 } else {
1044 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1046 .await?;
1047 }
1048 }
1049 BodyStreamingMode::Buffer => {
1050 if let Some(ref chunk) = body {
1052 if ctx.body_bytes_inspected < max_inspection_bytes {
1053 let bytes_to_inspect = std::cmp::min(
1054 chunk.len() as u64,
1055 max_inspection_bytes - ctx.body_bytes_inspected,
1056 ) as usize;
1057
1058 ctx.body_buffer
1059 .extend_from_slice(&chunk[..bytes_to_inspect]);
1060 ctx.body_bytes_inspected += bytes_to_inspect as u64;
1061
1062 trace!(
1063 correlation_id = %ctx.trace_id,
1064 bytes_inspected = ctx.body_bytes_inspected,
1065 max_inspection_bytes = max_inspection_bytes,
1066 buffer_size = ctx.body_buffer.len(),
1067 "Buffering body for agent inspection"
1068 );
1069 }
1070 }
1071
1072 let should_send =
1074 end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1075 if should_send && !ctx.body_buffer.is_empty() {
1076 self.send_buffered_body_to_agents(end_of_stream, ctx)
1077 .await?;
1078 ctx.body_buffer.clear();
1079 }
1080 }
1081 }
1082 }
1083
1084 if end_of_stream {
1085 trace!(
1086 correlation_id = %ctx.trace_id,
1087 total_body_bytes = ctx.request_body_bytes,
1088 bytes_inspected = ctx.body_bytes_inspected,
1089 "Request body complete"
1090 );
1091 }
1092
1093 Ok(())
1094 }
1095
1096 async fn response_filter(
1097 &self,
1098 _session: &mut Session,
1099 upstream_response: &mut ResponseHeader,
1100 ctx: &mut Self::CTX,
1101 ) -> Result<(), Box<Error>> {
1102 let status = upstream_response.status.as_u16();
1103 let duration = ctx.elapsed();
1104
1105 trace!(
1106 correlation_id = %ctx.trace_id,
1107 status = status,
1108 "Starting response filter phase"
1109 );
1110
1111 if status == 101 && ctx.is_websocket_upgrade {
1113 if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1114 let inspector = crate::websocket::WebSocketInspector::with_metrics(
1116 self.agent_manager.clone(),
1117 ctx.route_id
1118 .clone()
1119 .unwrap_or_else(|| "unknown".to_string()),
1120 ctx.trace_id.clone(),
1121 ctx.client_ip.clone(),
1122 100, Some(self.metrics.clone()),
1124 );
1125
1126 let handler = crate::websocket::WebSocketHandler::new(
1127 std::sync::Arc::new(inspector),
1128 1024 * 1024, );
1130
1131 ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1132
1133 info!(
1134 correlation_id = %ctx.trace_id,
1135 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1136 agent_count = ctx.websocket_inspection_agents.len(),
1137 "WebSocket upgrade successful, frame inspection enabled"
1138 );
1139 } else if ctx.websocket_skip_inspection {
1140 debug!(
1141 correlation_id = %ctx.trace_id,
1142 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1143 "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1144 );
1145 } else {
1146 debug!(
1147 correlation_id = %ctx.trace_id,
1148 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1149 "WebSocket upgrade successful"
1150 );
1151 }
1152 }
1153
1154 trace!(
1156 correlation_id = %ctx.trace_id,
1157 "Applying security headers"
1158 );
1159 self.apply_security_headers(upstream_response).ok();
1160
1161 upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1163
1164 if let Some(ref rate_info) = ctx.rate_limit_info {
1166 upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1167 upstream_response
1168 .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1169 upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1170 }
1171
1172 if let Some(ref country_code) = ctx.geo_country_code {
1174 upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1175 }
1176
1177 if status >= 400 {
1179 trace!(
1180 correlation_id = %ctx.trace_id,
1181 status = status,
1182 "Handling error response"
1183 );
1184 self.handle_error_response(upstream_response, ctx).await?;
1185 }
1186
1187 self.metrics.record_request(
1189 ctx.route_id.as_deref().unwrap_or("unknown"),
1190 &ctx.method,
1191 status,
1192 duration,
1193 );
1194
1195 if let Some(ref mut span) = ctx.otel_span {
1197 span.set_status(status);
1198 if let Some(ref upstream) = ctx.upstream {
1199 span.set_upstream(upstream, "");
1200 }
1201 if status >= 500 {
1202 span.record_error(&format!("HTTP {}", status));
1203 }
1204 }
1205
1206 if let Some(ref upstream) = ctx.upstream {
1208 let success = status < 500;
1209
1210 trace!(
1211 correlation_id = %ctx.trace_id,
1212 upstream = %upstream,
1213 success = success,
1214 status = status,
1215 "Recording passive health check result"
1216 );
1217
1218 let error_msg = if !success {
1219 Some(format!("HTTP {}", status))
1220 } else {
1221 None
1222 };
1223 self.passive_health
1224 .record_outcome(upstream, success, error_msg.as_deref())
1225 .await;
1226
1227 if let Some(pool) = self.upstream_pools.get(upstream).await {
1229 pool.report_result(upstream, success).await;
1230 }
1231
1232 if !success {
1233 warn!(
1234 correlation_id = %ctx.trace_id,
1235 upstream = %upstream,
1236 status = status,
1237 "Upstream returned error status"
1238 );
1239 }
1240 }
1241
1242 if status >= 500 {
1244 error!(
1245 correlation_id = %ctx.trace_id,
1246 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1247 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1248 method = %ctx.method,
1249 path = %ctx.path,
1250 status = status,
1251 duration_ms = duration.as_millis(),
1252 attempts = ctx.upstream_attempts,
1253 "Request completed with server error"
1254 );
1255 } else if status >= 400 {
1256 warn!(
1257 correlation_id = %ctx.trace_id,
1258 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1259 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1260 method = %ctx.method,
1261 path = %ctx.path,
1262 status = status,
1263 duration_ms = duration.as_millis(),
1264 "Request completed with client error"
1265 );
1266 } else {
1267 debug!(
1268 correlation_id = %ctx.trace_id,
1269 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1270 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1271 method = %ctx.method,
1272 path = %ctx.path,
1273 status = status,
1274 duration_ms = duration.as_millis(),
1275 attempts = ctx.upstream_attempts,
1276 "Request completed"
1277 );
1278 }
1279
1280 Ok(())
1281 }
1282
1283 async fn upstream_request_filter(
1286 &self,
1287 _session: &mut Session,
1288 upstream_request: &mut pingora::http::RequestHeader,
1289 ctx: &mut Self::CTX,
1290 ) -> Result<()>
1291 where
1292 Self::CTX: Send + Sync,
1293 {
1294 trace!(
1295 correlation_id = %ctx.trace_id,
1296 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1297 "Applying upstream request modifications"
1298 );
1299
1300 upstream_request
1302 .insert_header("X-Trace-Id", &ctx.trace_id)
1303 .ok();
1304
1305 if let Some(ref span) = ctx.otel_span {
1307 let sampled = ctx.trace_context.as_ref().map(|c| c.sampled).unwrap_or(true);
1308 let traceparent =
1309 crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled);
1310 upstream_request
1311 .insert_header(crate::otel::TRACEPARENT_HEADER, &traceparent)
1312 .ok();
1313 }
1314
1315 upstream_request
1317 .insert_header("X-Forwarded-By", "Sentinel")
1318 .ok();
1319
1320 if let Some(ref route_config) = ctx.route_config {
1323 let mods = route_config.policies.request_headers.clone();
1324
1325 for (name, value) in mods.set {
1327 upstream_request.insert_header(name, value).ok();
1328 }
1329
1330 for (name, value) in mods.add {
1332 upstream_request.append_header(name, value).ok();
1333 }
1334
1335 for name in &mods.remove {
1337 upstream_request.remove_header(name);
1338 }
1339
1340 trace!(
1341 correlation_id = %ctx.trace_id,
1342 "Applied request header modifications"
1343 );
1344 }
1345
1346 upstream_request.remove_header("X-Internal-Token");
1348 upstream_request.remove_header("Authorization-Internal");
1349
1350 Ok(())
1351 }
1352
1353 fn response_body_filter(
1359 &self,
1360 _session: &mut Session,
1361 body: &mut Option<Bytes>,
1362 end_of_stream: bool,
1363 ctx: &mut Self::CTX,
1364 ) -> Result<Option<Duration>, Box<Error>> {
1365 if ctx.is_websocket_upgrade {
1368 if let Some(ref handler) = ctx.websocket_handler {
1369 let handler = handler.clone();
1370 let data = body.take();
1371
1372 let result = tokio::task::block_in_place(|| {
1375 tokio::runtime::Handle::current()
1376 .block_on(async { handler.process_server_data(data).await })
1377 });
1378
1379 match result {
1380 crate::websocket::ProcessResult::Forward(data) => {
1381 *body = data;
1382 }
1383 crate::websocket::ProcessResult::Close(reason) => {
1384 warn!(
1385 correlation_id = %ctx.trace_id,
1386 code = reason.code,
1387 reason = %reason.reason,
1388 "WebSocket connection closed by agent (server->client)"
1389 );
1390 let close_frame =
1393 crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
1394 let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
1395 if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
1396 *body = Some(Bytes::from(encoded));
1397 }
1398 }
1399 }
1400 }
1401 return Ok(None);
1403 }
1404
1405 if let Some(ref chunk) = body {
1407 ctx.response_bytes += chunk.len() as u64;
1408
1409 trace!(
1410 correlation_id = %ctx.trace_id,
1411 chunk_size = chunk.len(),
1412 total_response_bytes = ctx.response_bytes,
1413 end_of_stream = end_of_stream,
1414 "Processing response body chunk"
1415 );
1416
1417 if ctx.response_body_inspection_enabled
1421 && !ctx.response_body_inspection_agents.is_empty()
1422 {
1423 let config = ctx
1424 .config
1425 .get_or_insert_with(|| self.config_manager.current());
1426 let max_inspection_bytes = config
1427 .waf
1428 .as_ref()
1429 .map(|w| w.body_inspection.max_inspection_bytes as u64)
1430 .unwrap_or(1024 * 1024);
1431
1432 if ctx.response_body_bytes_inspected < max_inspection_bytes {
1433 let bytes_to_inspect = std::cmp::min(
1434 chunk.len() as u64,
1435 max_inspection_bytes - ctx.response_body_bytes_inspected,
1436 ) as usize;
1437
1438 ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
1442 ctx.response_body_chunk_index += 1;
1443
1444 trace!(
1445 correlation_id = %ctx.trace_id,
1446 bytes_inspected = ctx.response_body_bytes_inspected,
1447 max_inspection_bytes = max_inspection_bytes,
1448 chunk_index = ctx.response_body_chunk_index,
1449 "Tracking response body for inspection"
1450 );
1451 }
1452 }
1453 }
1454
1455 if end_of_stream {
1456 trace!(
1457 correlation_id = %ctx.trace_id,
1458 total_response_bytes = ctx.response_bytes,
1459 response_bytes_inspected = ctx.response_body_bytes_inspected,
1460 "Response body complete"
1461 );
1462 }
1463
1464 Ok(None)
1466 }
1467
1468 async fn connected_to_upstream(
1471 &self,
1472 _session: &mut Session,
1473 reused: bool,
1474 peer: &HttpPeer,
1475 #[cfg(unix)] _fd: RawFd,
1476 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
1477 digest: Option<&Digest>,
1478 ctx: &mut Self::CTX,
1479 ) -> Result<(), Box<Error>> {
1480 ctx.connection_reused = reused;
1482
1483 if reused {
1485 trace!(
1486 correlation_id = %ctx.trace_id,
1487 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1488 peer_address = %peer.address(),
1489 "Reusing existing upstream connection"
1490 );
1491 } else {
1492 debug!(
1493 correlation_id = %ctx.trace_id,
1494 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1495 peer_address = %peer.address(),
1496 ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
1497 "Established new upstream connection"
1498 );
1499 }
1500
1501 Ok(())
1502 }
1503
1504 fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
1514 let route_id = match ctx.route_id.as_deref() {
1516 Some(id) => id,
1517 None => {
1518 trace!(
1519 correlation_id = %ctx.trace_id,
1520 "Cache filter: no route ID, skipping cache"
1521 );
1522 return Ok(());
1523 }
1524 };
1525
1526 if !self.cache_manager.is_enabled(route_id) {
1528 trace!(
1529 correlation_id = %ctx.trace_id,
1530 route_id = %route_id,
1531 "Cache disabled for route"
1532 );
1533 return Ok(());
1534 }
1535
1536 if !self
1538 .cache_manager
1539 .is_method_cacheable(route_id, &ctx.method)
1540 {
1541 trace!(
1542 correlation_id = %ctx.trace_id,
1543 route_id = %route_id,
1544 method = %ctx.method,
1545 "Method not cacheable"
1546 );
1547 return Ok(());
1548 }
1549
1550 debug!(
1552 correlation_id = %ctx.trace_id,
1553 route_id = %route_id,
1554 method = %ctx.method,
1555 path = %ctx.path,
1556 "Enabling HTTP caching for request"
1557 );
1558
1559 let storage = get_cache_storage();
1561 let eviction = get_cache_eviction();
1562 let cache_lock = get_cache_lock();
1563
1564 session.cache.enable(
1566 storage,
1567 Some(eviction),
1568 None, Some(cache_lock),
1570 None, );
1572
1573 ctx.cache_eligible = true;
1575
1576 trace!(
1577 correlation_id = %ctx.trace_id,
1578 route_id = %route_id,
1579 cache_enabled = session.cache.enabled(),
1580 "Cache enabled for request"
1581 );
1582
1583 Ok(())
1584 }
1585
1586 fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
1591 let req_header = session.req_header();
1592 let method = req_header.method.as_str();
1593 let path = req_header.uri.path();
1594 let host = ctx.host.as_deref().unwrap_or("unknown");
1595 let query = req_header.uri.query();
1596
1597 let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1599
1600 trace!(
1601 correlation_id = %ctx.trace_id,
1602 cache_key = %key_string,
1603 "Generated cache key"
1604 );
1605
1606 Ok(CacheKey::default(req_header))
1609 }
1610
1611 fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
1616 session.cache.cache_miss();
1618
1619 if let Some(route_id) = ctx.route_id.as_deref() {
1621 self.cache_manager.stats().record_miss();
1622
1623 trace!(
1624 correlation_id = %ctx.trace_id,
1625 route_id = %route_id,
1626 path = %ctx.path,
1627 "Cache miss"
1628 );
1629 }
1630 }
1631
1632 async fn cache_hit_filter(
1638 &self,
1639 session: &mut Session,
1640 meta: &CacheMeta,
1641 _hit_handler: &mut HitHandler,
1642 is_fresh: bool,
1643 ctx: &mut Self::CTX,
1644 ) -> Result<Option<ForcedInvalidationKind>>
1645 where
1646 Self::CTX: Send + Sync,
1647 {
1648 let req_header = session.req_header();
1650 let method = req_header.method.as_str();
1651 let path = req_header.uri.path();
1652 let host = req_header.uri.host().unwrap_or("localhost");
1653 let query = req_header.uri.query();
1654
1655 let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1657
1658 if self.cache_manager.should_invalidate(&cache_key) {
1660 info!(
1661 correlation_id = %ctx.trace_id,
1662 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1663 cache_key = %cache_key,
1664 "Cache entry invalidated by purge request"
1665 );
1666 return Ok(Some(ForcedInvalidationKind::ForceExpired));
1668 }
1669
1670 if is_fresh {
1672 self.cache_manager.stats().record_hit();
1673
1674 debug!(
1675 correlation_id = %ctx.trace_id,
1676 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1677 is_fresh = is_fresh,
1678 "Cache hit (fresh)"
1679 );
1680 } else {
1681 trace!(
1682 correlation_id = %ctx.trace_id,
1683 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1684 is_fresh = is_fresh,
1685 "Cache hit (stale)"
1686 );
1687 }
1688
1689 Ok(None)
1691 }
1692
1693 fn response_cache_filter(
1698 &self,
1699 _session: &Session,
1700 resp: &ResponseHeader,
1701 ctx: &mut Self::CTX,
1702 ) -> Result<RespCacheable> {
1703 let route_id = match ctx.route_id.as_deref() {
1704 Some(id) => id,
1705 None => {
1706 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1707 "no_route",
1708 )));
1709 }
1710 };
1711
1712 if !self.cache_manager.is_enabled(route_id) {
1714 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1715 "disabled",
1716 )));
1717 }
1718
1719 let status = resp.status.as_u16();
1720
1721 if !self.cache_manager.is_status_cacheable(route_id, status) {
1723 trace!(
1724 correlation_id = %ctx.trace_id,
1725 route_id = %route_id,
1726 status = status,
1727 "Status code not cacheable"
1728 );
1729 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1730 }
1731
1732 if let Some(cache_control) = resp.headers.get("cache-control") {
1734 if let Ok(cc_str) = cache_control.to_str() {
1735 if crate::cache::CacheManager::is_no_cache(cc_str) {
1736 trace!(
1737 correlation_id = %ctx.trace_id,
1738 route_id = %route_id,
1739 cache_control = %cc_str,
1740 "Response has no-cache directive"
1741 );
1742 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1743 }
1744 }
1745 }
1746
1747 let cache_control = resp
1749 .headers
1750 .get("cache-control")
1751 .and_then(|v| v.to_str().ok());
1752 let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
1753
1754 if ttl.is_zero() {
1755 trace!(
1756 correlation_id = %ctx.trace_id,
1757 route_id = %route_id,
1758 "TTL is zero, not caching"
1759 );
1760 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1761 }
1762
1763 let config = self
1765 .cache_manager
1766 .get_route_config(route_id)
1767 .unwrap_or_default();
1768
1769 let now = std::time::SystemTime::now();
1771 let fresh_until = now + ttl;
1772
1773 let header = resp.clone();
1775
1776 let cache_meta = CacheMeta::new(
1778 fresh_until,
1779 now,
1780 config.stale_while_revalidate_secs as u32,
1781 config.stale_if_error_secs as u32,
1782 header,
1783 );
1784
1785 self.cache_manager.stats().record_store();
1787
1788 debug!(
1789 correlation_id = %ctx.trace_id,
1790 route_id = %route_id,
1791 status = status,
1792 ttl_secs = ttl.as_secs(),
1793 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1794 stale_if_error_secs = config.stale_if_error_secs,
1795 "Caching response"
1796 );
1797
1798 Ok(RespCacheable::Cacheable(cache_meta))
1799 }
1800
1801 fn should_serve_stale(
1805 &self,
1806 _session: &mut Session,
1807 ctx: &mut Self::CTX,
1808 error: Option<&Error>,
1809 ) -> bool {
1810 let route_id = match ctx.route_id.as_deref() {
1811 Some(id) => id,
1812 None => return false,
1813 };
1814
1815 let config = match self.cache_manager.get_route_config(route_id) {
1817 Some(c) => c,
1818 None => return false,
1819 };
1820
1821 if let Some(e) = error {
1823 if e.esource() == &pingora::ErrorSource::Upstream {
1825 debug!(
1826 correlation_id = %ctx.trace_id,
1827 route_id = %route_id,
1828 error = %e,
1829 stale_if_error_secs = config.stale_if_error_secs,
1830 "Considering stale-if-error"
1831 );
1832 return config.stale_if_error_secs > 0;
1833 }
1834 }
1835
1836 if error.is_none() && config.stale_while_revalidate_secs > 0 {
1838 trace!(
1839 correlation_id = %ctx.trace_id,
1840 route_id = %route_id,
1841 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1842 "Allowing stale-while-revalidate"
1843 );
1844 return true;
1845 }
1846
1847 false
1848 }
1849
1850 fn range_header_filter(
1860 &self,
1861 session: &mut Session,
1862 response: &mut ResponseHeader,
1863 ctx: &mut Self::CTX,
1864 ) -> pingora_proxy::RangeType
1865 where
1866 Self::CTX: Send + Sync,
1867 {
1868 let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
1870 matches!(
1872 config.service_type,
1873 sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
1874 )
1875 });
1876
1877 if !supports_range {
1878 trace!(
1879 correlation_id = %ctx.trace_id,
1880 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1881 "Range request not supported for this route type"
1882 );
1883 return pingora_proxy::RangeType::None;
1884 }
1885
1886 let range_type = pingora_proxy::range_header_filter(session.req_header(), response);
1888
1889 match &range_type {
1890 pingora_proxy::RangeType::None => {
1891 trace!(
1892 correlation_id = %ctx.trace_id,
1893 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1894 "No range request or not applicable"
1895 );
1896 }
1897 pingora_proxy::RangeType::Single(range) => {
1898 trace!(
1899 correlation_id = %ctx.trace_id,
1900 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1901 range_start = range.start,
1902 range_end = range.end,
1903 "Processing single-range request"
1904 );
1905 }
1906 pingora_proxy::RangeType::Multi(multi) => {
1907 trace!(
1908 correlation_id = %ctx.trace_id,
1909 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1910 range_count = multi.ranges.len(),
1911 "Processing multi-range request"
1912 );
1913 }
1914 pingora_proxy::RangeType::Invalid => {
1915 debug!(
1916 correlation_id = %ctx.trace_id,
1917 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1918 "Invalid range header"
1919 );
1920 }
1921 }
1922
1923 range_type
1924 }
1925
1926 async fn fail_to_proxy(
1929 &self,
1930 session: &mut Session,
1931 e: &Error,
1932 ctx: &mut Self::CTX,
1933 ) -> pingora_proxy::FailToProxy
1934 where
1935 Self::CTX: Send + Sync,
1936 {
1937 let error_code = match e.etype() {
1938 ErrorType::ConnectRefused => 503,
1940 ErrorType::ConnectTimedout => 504,
1941 ErrorType::ConnectNoRoute => 502,
1942
1943 ErrorType::ReadTimedout => 504,
1945 ErrorType::WriteTimedout => 504,
1946
1947 ErrorType::TLSHandshakeFailure => 502,
1949 ErrorType::InvalidCert => 502,
1950
1951 ErrorType::InvalidHTTPHeader => 400,
1953 ErrorType::H2Error => 502,
1954
1955 ErrorType::ConnectProxyFailure => 502,
1957 ErrorType::ConnectionClosed => 502,
1958
1959 ErrorType::HTTPStatus(status) => *status,
1961
1962 ErrorType::InternalError => {
1964 let error_str = e.to_string();
1966 if error_str.contains("upstream")
1967 || error_str.contains("DNS")
1968 || error_str.contains("resolve")
1969 {
1970 502
1971 } else {
1972 500
1973 }
1974 }
1975
1976 _ => 502,
1978 };
1979
1980 error!(
1981 correlation_id = %ctx.trace_id,
1982 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1983 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1984 error_type = ?e.etype(),
1985 error = %e,
1986 error_code = error_code,
1987 "Proxy error occurred"
1988 );
1989
1990 self.metrics
1992 .record_blocked_request(&format!("proxy_error_{}", error_code));
1993
1994 let error_message = match error_code {
1998 400 => "Bad Request",
1999 502 => "Bad Gateway",
2000 503 => "Service Unavailable",
2001 504 => "Gateway Timeout",
2002 _ => "Internal Server Error",
2003 };
2004
2005 let body = format!(
2007 r#"{{"error":"{} {}","trace_id":"{}"}}"#,
2008 error_code, error_message, ctx.trace_id
2009 );
2010
2011 let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
2013 header
2014 .insert_header("Content-Type", "application/json")
2015 .ok();
2016 header
2017 .insert_header("Content-Length", body.len().to_string())
2018 .ok();
2019 header
2020 .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
2021 .ok();
2022 header.insert_header("Connection", "close").ok();
2023
2024 if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
2026 warn!(
2027 correlation_id = %ctx.trace_id,
2028 error = %write_err,
2029 "Failed to write error response header"
2030 );
2031 } else {
2032 if let Err(write_err) = session
2034 .write_response_body(Some(bytes::Bytes::from(body)), true)
2035 .await
2036 {
2037 warn!(
2038 correlation_id = %ctx.trace_id,
2039 error = %write_err,
2040 "Failed to write error response body"
2041 );
2042 }
2043 }
2044
2045 pingora_proxy::FailToProxy {
2048 error_code,
2049 can_reuse_downstream: false,
2050 }
2051 }
2052
2053 fn error_while_proxy(
2059 &self,
2060 peer: &HttpPeer,
2061 session: &mut Session,
2062 e: Box<Error>,
2063 ctx: &mut Self::CTX,
2064 client_reused: bool,
2065 ) -> Box<Error> {
2066 let error_type = e.etype().clone();
2067 let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
2068
2069 let is_retryable = matches!(
2071 error_type,
2072 ErrorType::ConnectTimedout
2073 | ErrorType::ReadTimedout
2074 | ErrorType::WriteTimedout
2075 | ErrorType::ConnectionClosed
2076 | ErrorType::ConnectRefused
2077 );
2078
2079 warn!(
2081 correlation_id = %ctx.trace_id,
2082 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2083 upstream = %upstream_id,
2084 peer_address = %peer.address(),
2085 error_type = ?error_type,
2086 error = %e,
2087 client_reused = client_reused,
2088 is_retryable = is_retryable,
2089 "Error during proxy operation"
2090 );
2091
2092 let peer_address = peer.address().to_string();
2095 let upstream_pools = self.upstream_pools.clone();
2096 let upstream_id_owned = upstream_id.to_string();
2097 tokio::spawn(async move {
2098 if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
2099 pool.report_result(&peer_address, false).await;
2100 }
2101 });
2102
2103 self.metrics
2105 .record_blocked_request(&format!("proxy_error_{:?}", error_type));
2106
2107 let mut enhanced_error = e.more_context(format!(
2109 "Upstream: {}, Peer: {}, Attempts: {}",
2110 upstream_id,
2111 peer.address(),
2112 ctx.upstream_attempts
2113 ));
2114
2115 if is_retryable {
2120 let can_retry = if client_reused {
2121 !session.as_ref().retry_buffer_truncated()
2123 } else {
2124 true
2126 };
2127
2128 enhanced_error.retry.decide_reuse(can_retry);
2129
2130 if can_retry {
2131 debug!(
2132 correlation_id = %ctx.trace_id,
2133 upstream = %upstream_id,
2134 error_type = ?error_type,
2135 "Error is retryable, will attempt retry"
2136 );
2137 }
2138 } else {
2139 enhanced_error.retry.decide_reuse(false);
2141 }
2142
2143 enhanced_error
2144 }
2145
2146 async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
2147 self.reload_coordinator.dec_requests();
2149
2150 let duration = ctx.elapsed();
2151
2152 let status = session
2154 .response_written()
2155 .map(|r| r.status.as_u16())
2156 .unwrap_or(0);
2157
2158 if let (Some(ref peer_addr), Some(ref upstream_id)) =
2161 (&ctx.selected_upstream_address, &ctx.upstream)
2162 {
2163 let success = status > 0 && status < 500;
2165
2166 if let Some(pool) = self.upstream_pools.get(upstream_id).await {
2167 pool.report_result_with_latency(peer_addr, success, Some(duration))
2168 .await;
2169 trace!(
2170 correlation_id = %ctx.trace_id,
2171 upstream = %upstream_id,
2172 peer_address = %peer_addr,
2173 success = success,
2174 duration_ms = duration.as_millis(),
2175 status = status,
2176 "Reported result to adaptive load balancer"
2177 );
2178 }
2179 }
2180
2181 if self.log_manager.access_log_enabled() {
2183 let access_entry = AccessLogEntry {
2184 timestamp: chrono::Utc::now().to_rfc3339(),
2185 trace_id: ctx.trace_id.clone(),
2186 method: ctx.method.clone(),
2187 path: ctx.path.clone(),
2188 query: ctx.query.clone(),
2189 protocol: "HTTP/1.1".to_string(),
2190 status,
2191 body_bytes: ctx.response_bytes,
2192 duration_ms: duration.as_millis() as u64,
2193 client_ip: ctx.client_ip.clone(),
2194 user_agent: ctx.user_agent.clone(),
2195 referer: ctx.referer.clone(),
2196 host: ctx.host.clone(),
2197 route_id: ctx.route_id.clone(),
2198 upstream: ctx.upstream.clone(),
2199 upstream_attempts: ctx.upstream_attempts,
2200 instance_id: self.app_state.instance_id.clone(),
2201 namespace: ctx.namespace.clone(),
2202 service: ctx.service.clone(),
2203 };
2204 self.log_manager.log_access(&access_entry);
2205 }
2206
2207 if tracing::enabled!(tracing::Level::DEBUG) {
2209 debug!(
2210 trace_id = %ctx.trace_id,
2211 method = %ctx.method,
2212 path = %ctx.path,
2213 route_id = ?ctx.route_id,
2214 upstream = ?ctx.upstream,
2215 status = status,
2216 duration_ms = duration.as_millis() as u64,
2217 upstream_attempts = ctx.upstream_attempts,
2218 error = ?_error.map(|e| e.to_string()),
2219 "Request completed"
2220 );
2221 }
2222
2223 if ctx.is_websocket_upgrade && status == 101 {
2225 info!(
2226 trace_id = %ctx.trace_id,
2227 route_id = ?ctx.route_id,
2228 upstream = ?ctx.upstream,
2229 client_ip = %ctx.client_ip,
2230 "WebSocket connection established"
2231 );
2232 }
2233
2234 if let Some(span) = ctx.otel_span.take() {
2236 span.end();
2237 }
2238 }
2239}
2240
2241impl SentinelProxy {
2246 async fn process_body_chunk_streaming(
2248 &self,
2249 body: &mut Option<Bytes>,
2250 end_of_stream: bool,
2251 ctx: &mut RequestContext,
2252 ) -> Result<(), Box<Error>> {
2253 let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
2255 let chunk_index = ctx.request_body_chunk_index;
2256 ctx.request_body_chunk_index += 1;
2257 ctx.body_bytes_inspected += chunk_data.len() as u64;
2258
2259 debug!(
2260 correlation_id = %ctx.trace_id,
2261 chunk_index = chunk_index,
2262 chunk_size = chunk_data.len(),
2263 end_of_stream = end_of_stream,
2264 "Streaming body chunk to agents"
2265 );
2266
2267 let agent_ctx = crate::agents::AgentCallContext {
2269 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
2270 metadata: sentinel_agent_protocol::RequestMetadata {
2271 correlation_id: ctx.trace_id.clone(),
2272 request_id: ctx.trace_id.clone(),
2273 client_ip: ctx.client_ip.clone(),
2274 client_port: 0,
2275 server_name: ctx.host.clone(),
2276 protocol: "HTTP/1.1".to_string(),
2277 tls_version: None,
2278 tls_cipher: None,
2279 route_id: ctx.route_id.clone(),
2280 upstream_id: ctx.upstream.clone(),
2281 timestamp: chrono::Utc::now().to_rfc3339(),
2282 traceparent: ctx.traceparent(),
2283 },
2284 route_id: ctx.route_id.clone(),
2285 upstream_id: ctx.upstream.clone(),
2286 request_body: None, response_body: None,
2288 };
2289
2290 let agent_ids = ctx.body_inspection_agents.clone();
2291 let total_size = None; match self
2294 .agent_manager
2295 .process_request_body_streaming(
2296 &agent_ctx,
2297 &chunk_data,
2298 end_of_stream,
2299 chunk_index,
2300 ctx.body_bytes_inspected as usize,
2301 total_size,
2302 &agent_ids,
2303 )
2304 .await
2305 {
2306 Ok(decision) => {
2307 ctx.agent_needs_more = decision.needs_more;
2309
2310 if let Some(ref mutation) = decision.request_body_mutation {
2312 if !mutation.is_pass_through() {
2313 if mutation.is_drop() {
2314 *body = None;
2316 trace!(
2317 correlation_id = %ctx.trace_id,
2318 chunk_index = chunk_index,
2319 "Agent dropped body chunk"
2320 );
2321 } else if let Some(ref new_data) = mutation.data {
2322 *body = Some(Bytes::from(new_data.clone()));
2324 trace!(
2325 correlation_id = %ctx.trace_id,
2326 chunk_index = chunk_index,
2327 original_size = chunk_data.len(),
2328 new_size = new_data.len(),
2329 "Agent mutated body chunk"
2330 );
2331 }
2332 }
2333 }
2334
2335 if !decision.needs_more && !decision.is_allow() {
2337 warn!(
2338 correlation_id = %ctx.trace_id,
2339 action = ?decision.action,
2340 "Agent blocked request body"
2341 );
2342 self.metrics.record_blocked_request("agent_body_inspection");
2343
2344 let (status, message) = match &decision.action {
2345 crate::agents::AgentAction::Block { status, body, .. } => (
2346 *status,
2347 body.clone().unwrap_or_else(|| "Blocked".to_string()),
2348 ),
2349 _ => (403, "Forbidden".to_string()),
2350 };
2351
2352 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2353 }
2354
2355 trace!(
2356 correlation_id = %ctx.trace_id,
2357 needs_more = decision.needs_more,
2358 "Agent processed body chunk"
2359 );
2360 }
2361 Err(e) => {
2362 let fail_closed = ctx
2363 .route_config
2364 .as_ref()
2365 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2366 .unwrap_or(false);
2367
2368 if fail_closed {
2369 error!(
2370 correlation_id = %ctx.trace_id,
2371 error = %e,
2372 "Agent streaming body inspection failed, blocking (fail-closed)"
2373 );
2374 return Err(Error::explain(
2375 ErrorType::HTTPStatus(503),
2376 "Service unavailable",
2377 ));
2378 } else {
2379 warn!(
2380 correlation_id = %ctx.trace_id,
2381 error = %e,
2382 "Agent streaming body inspection failed, allowing (fail-open)"
2383 );
2384 }
2385 }
2386 }
2387
2388 Ok(())
2389 }
2390
2391 async fn send_buffered_body_to_agents(
2393 &self,
2394 end_of_stream: bool,
2395 ctx: &mut RequestContext,
2396 ) -> Result<(), Box<Error>> {
2397 debug!(
2398 correlation_id = %ctx.trace_id,
2399 buffer_size = ctx.body_buffer.len(),
2400 end_of_stream = end_of_stream,
2401 agent_count = ctx.body_inspection_agents.len(),
2402 decompression_enabled = ctx.decompression_enabled,
2403 "Sending buffered body to agents for inspection"
2404 );
2405
2406 let body_for_inspection = if ctx.decompression_enabled {
2408 if let Some(ref encoding) = ctx.body_content_encoding {
2409 let config = crate::decompression::DecompressionConfig {
2410 max_ratio: ctx.max_decompression_ratio,
2411 max_output_bytes: ctx.max_decompression_bytes,
2412 };
2413
2414 match crate::decompression::decompress_body(
2415 &ctx.body_buffer,
2416 encoding,
2417 &config,
2418 ) {
2419 Ok(result) => {
2420 ctx.body_was_decompressed = true;
2421 self.metrics
2422 .record_decompression_success(encoding, result.ratio);
2423 debug!(
2424 correlation_id = %ctx.trace_id,
2425 encoding = %encoding,
2426 compressed_size = result.compressed_size,
2427 decompressed_size = result.decompressed_size,
2428 ratio = result.ratio,
2429 "Body decompressed for agent inspection"
2430 );
2431 result.data
2432 }
2433 Err(e) => {
2434 let failure_reason = match &e {
2436 crate::decompression::DecompressionError::RatioExceeded { .. } => {
2437 "ratio_exceeded"
2438 }
2439 crate::decompression::DecompressionError::SizeExceeded { .. } => {
2440 "size_exceeded"
2441 }
2442 crate::decompression::DecompressionError::InvalidData { .. } => {
2443 "invalid_data"
2444 }
2445 crate::decompression::DecompressionError::UnsupportedEncoding { .. } => {
2446 "unsupported"
2447 }
2448 crate::decompression::DecompressionError::IoError(_) => "io_error",
2449 };
2450 self.metrics
2451 .record_decompression_failure(encoding, failure_reason);
2452
2453 let fail_closed = ctx
2455 .route_config
2456 .as_ref()
2457 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2458 .unwrap_or(false);
2459
2460 if fail_closed {
2461 error!(
2462 correlation_id = %ctx.trace_id,
2463 error = %e,
2464 encoding = %encoding,
2465 "Decompression failed, blocking (fail-closed)"
2466 );
2467 return Err(Error::explain(
2468 ErrorType::HTTPStatus(400),
2469 "Invalid compressed body",
2470 ));
2471 } else {
2472 warn!(
2473 correlation_id = %ctx.trace_id,
2474 error = %e,
2475 encoding = %encoding,
2476 "Decompression failed, sending compressed body (fail-open)"
2477 );
2478 ctx.body_buffer.clone()
2479 }
2480 }
2481 }
2482 } else {
2483 ctx.body_buffer.clone()
2484 }
2485 } else {
2486 ctx.body_buffer.clone()
2487 };
2488
2489 let agent_ctx = crate::agents::AgentCallContext {
2490 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
2491 metadata: sentinel_agent_protocol::RequestMetadata {
2492 correlation_id: ctx.trace_id.clone(),
2493 request_id: ctx.trace_id.clone(),
2494 client_ip: ctx.client_ip.clone(),
2495 client_port: 0,
2496 server_name: ctx.host.clone(),
2497 protocol: "HTTP/1.1".to_string(),
2498 tls_version: None,
2499 tls_cipher: None,
2500 route_id: ctx.route_id.clone(),
2501 upstream_id: ctx.upstream.clone(),
2502 timestamp: chrono::Utc::now().to_rfc3339(),
2503 traceparent: ctx.traceparent(),
2504 },
2505 route_id: ctx.route_id.clone(),
2506 upstream_id: ctx.upstream.clone(),
2507 request_body: Some(body_for_inspection.clone()),
2508 response_body: None,
2509 };
2510
2511 let agent_ids = ctx.body_inspection_agents.clone();
2512 match self
2513 .agent_manager
2514 .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
2515 .await
2516 {
2517 Ok(decision) => {
2518 if !decision.is_allow() {
2519 warn!(
2520 correlation_id = %ctx.trace_id,
2521 action = ?decision.action,
2522 "Agent blocked request body"
2523 );
2524 self.metrics.record_blocked_request("agent_body_inspection");
2525
2526 let (status, message) = match &decision.action {
2527 crate::agents::AgentAction::Block { status, body, .. } => (
2528 *status,
2529 body.clone().unwrap_or_else(|| "Blocked".to_string()),
2530 ),
2531 _ => (403, "Forbidden".to_string()),
2532 };
2533
2534 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2535 }
2536
2537 trace!(
2538 correlation_id = %ctx.trace_id,
2539 "Agent allowed request body"
2540 );
2541 }
2542 Err(e) => {
2543 let fail_closed = ctx
2544 .route_config
2545 .as_ref()
2546 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2547 .unwrap_or(false);
2548
2549 if fail_closed {
2550 error!(
2551 correlation_id = %ctx.trace_id,
2552 error = %e,
2553 "Agent body inspection failed, blocking (fail-closed)"
2554 );
2555 return Err(Error::explain(
2556 ErrorType::HTTPStatus(503),
2557 "Service unavailable",
2558 ));
2559 } else {
2560 warn!(
2561 correlation_id = %ctx.trace_id,
2562 error = %e,
2563 "Agent body inspection failed, allowing (fail-open)"
2564 );
2565 }
2566 }
2567 }
2568
2569 Ok(())
2570 }
2571}