1use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14 CacheKey, CacheMeta, ForcedInvalidationKind, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
23use crate::rate_limit::HeaderAccessor;
24use crate::routing::RequestInfo;
25
26use super::context::RequestContext;
27use super::SentinelProxy;
28
29struct NoHeaderAccessor;
31impl HeaderAccessor for NoHeaderAccessor {
32 fn get_header(&self, _name: &str) -> Option<String> {
33 None
34 }
35}
36
37#[async_trait]
38impl ProxyHttp for SentinelProxy {
39 type CTX = RequestContext;
40
41 fn new_ctx(&self) -> Self::CTX {
42 RequestContext::new()
43 }
44
45 fn fail_to_connect(
46 &self,
47 _session: &mut Session,
48 peer: &HttpPeer,
49 ctx: &mut Self::CTX,
50 e: Box<Error>,
51 ) -> Box<Error> {
52 error!(
53 correlation_id = %ctx.trace_id,
54 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
55 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
56 peer_address = %peer.address(),
57 error = %e,
58 "Failed to connect to upstream peer"
59 );
60 e
62 }
63
64 async fn early_request_filter(
67 &self,
68 session: &mut Session,
69 ctx: &mut Self::CTX,
70 ) -> Result<(), Box<Error>> {
71 let req_header = session.req_header();
73 let method = req_header.method.as_str();
74 let path = req_header.uri.path();
75 let host = req_header
76 .headers
77 .get("host")
78 .and_then(|h| h.to_str().ok())
79 .unwrap_or("");
80
81 ctx.method = method.to_string();
82 ctx.path = path.to_string();
83 ctx.host = Some(host.to_string());
84
85 let route_match = {
87 let route_matcher = self.route_matcher.read();
88 let request_info = RequestInfo::new(method, path, host);
89 match route_matcher.match_request(&request_info) {
90 Some(m) => m,
91 None => return Ok(()), }
93 };
94
95 ctx.trace_id = self.get_trace_id(session);
96 ctx.route_id = Some(route_match.route_id.to_string());
97 ctx.route_config = Some(route_match.config.clone());
98
99 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
101 trace!(
102 correlation_id = %ctx.trace_id,
103 route_id = %route_match.route_id,
104 builtin_handler = ?route_match.config.builtin_handler,
105 "Handling builtin route in early_request_filter"
106 );
107
108 let handled = self
110 .handle_builtin_route(session, ctx, &route_match)
111 .await?;
112
113 if handled {
114 return Err(Error::explain(
116 ErrorType::InternalError,
117 "Builtin handler complete",
118 ));
119 }
120 }
121
122 Ok(())
123 }
124
125 async fn upstream_peer(
126 &self,
127 session: &mut Session,
128 ctx: &mut Self::CTX,
129 ) -> Result<Box<HttpPeer>, Box<Error>> {
130 self.reload_coordinator.inc_requests();
132
133 if ctx.config.is_none() {
135 ctx.config = Some(self.config_manager.current());
136 }
137
138 if ctx.client_ip.is_empty() {
140 ctx.client_ip = session
141 .client_addr()
142 .map(|a| a.to_string())
143 .unwrap_or_else(|| "unknown".to_string());
144 }
145
146 let req_header = session.req_header();
147
148 if ctx.method.is_empty() {
150 ctx.method = req_header.method.to_string();
151 ctx.path = req_header.uri.path().to_string();
152 ctx.query = req_header.uri.query().map(|q| q.to_string());
153 ctx.host = req_header
154 .headers
155 .get("host")
156 .and_then(|v| v.to_str().ok())
157 .map(|s| s.to_string());
158 }
159 ctx.user_agent = req_header
160 .headers
161 .get("user-agent")
162 .and_then(|v| v.to_str().ok())
163 .map(|s| s.to_string());
164 ctx.referer = req_header
165 .headers
166 .get("referer")
167 .and_then(|v| v.to_str().ok())
168 .map(|s| s.to_string());
169
170 trace!(
171 correlation_id = %ctx.trace_id,
172 client_ip = %ctx.client_ip,
173 "Request received, initializing context"
174 );
175
176 let route_match = if let Some(ref route_config) = ctx.route_config {
178 let route_id = ctx.route_id.as_deref().unwrap_or("");
179 crate::routing::RouteMatch {
180 route_id: sentinel_common::RouteId::new(route_id),
181 config: route_config.clone(),
182 }
183 } else {
184 let (match_result, route_duration) = {
186 let route_matcher = self.route_matcher.read();
187 let host = ctx.host.as_deref().unwrap_or("");
188
189 let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
191
192 if route_matcher.needs_headers() {
194 request_info = request_info
195 .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
196 }
197
198 if route_matcher.needs_query_params() {
200 request_info =
201 request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
202 }
203
204 trace!(
205 correlation_id = %ctx.trace_id,
206 method = %request_info.method,
207 path = %request_info.path,
208 host = %request_info.host,
209 "Built request info for route matching"
210 );
211
212 let route_start = std::time::Instant::now();
213 let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
214 warn!(
215 correlation_id = %ctx.trace_id,
216 method = %request_info.method,
217 path = %request_info.path,
218 host = %request_info.host,
219 "No matching route found for request"
220 );
221 Error::explain(ErrorType::InternalError, "No matching route found")
222 })?;
223 let route_duration = route_start.elapsed();
224 (route_match, route_duration)
226 };
227
228 ctx.route_id = Some(match_result.route_id.to_string());
229 ctx.route_config = Some(match_result.config.clone());
230
231 trace!(
232 correlation_id = %ctx.trace_id,
233 route_id = %match_result.route_id,
234 route_duration_us = route_duration.as_micros(),
235 service_type = ?match_result.config.service_type,
236 "Route matched"
237 );
238 match_result
239 };
240
241 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
243 trace!(
244 correlation_id = %ctx.trace_id,
245 route_id = %route_match.route_id,
246 builtin_handler = ?route_match.config.builtin_handler,
247 "Route type is builtin, skipping upstream"
248 );
249 ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
251 return Err(Error::explain(
253 ErrorType::InternalError,
254 "Builtin handler handled in request_filter",
255 ));
256 }
257
258 if route_match.config.service_type == sentinel_config::ServiceType::Static {
260 trace!(
261 correlation_id = %ctx.trace_id,
262 route_id = %route_match.route_id,
263 "Route type is static, checking for static server"
264 );
265 if self
267 .static_servers
268 .get(route_match.route_id.as_str())
269 .await
270 .is_some()
271 {
272 ctx.upstream = Some(format!("_static_{}", route_match.route_id));
274 info!(
275 correlation_id = %ctx.trace_id,
276 route_id = %route_match.route_id,
277 path = %ctx.path,
278 "Serving static file"
279 );
280 return Err(Error::explain(
282 ErrorType::InternalError,
283 "Static file serving handled in request_filter",
284 ));
285 }
286 }
287
288 if let Some(ref upstream) = route_match.config.upstream {
290 ctx.upstream = Some(upstream.clone());
291 trace!(
292 correlation_id = %ctx.trace_id,
293 route_id = %route_match.route_id,
294 upstream = %upstream,
295 "Upstream configured for route"
296 );
297 } else {
298 error!(
299 correlation_id = %ctx.trace_id,
300 route_id = %route_match.route_id,
301 "Route has no upstream configured"
302 );
303 return Err(Error::explain(
304 ErrorType::InternalError,
305 format!(
306 "Route '{}' has no upstream configured",
307 route_match.route_id
308 ),
309 ));
310 }
311
312 debug!(
313 correlation_id = %ctx.trace_id,
314 route_id = %route_match.route_id,
315 upstream = ?ctx.upstream,
316 method = %req_header.method,
317 path = %req_header.uri.path(),
318 host = ctx.host.as_deref().unwrap_or("-"),
319 client_ip = %ctx.client_ip,
320 "Processing request"
321 );
322
323 if ctx
325 .upstream
326 .as_ref()
327 .is_some_and(|u| u.starts_with("_static_"))
328 {
329 return Err(Error::explain(
331 ErrorType::InternalError,
332 "Static route should be handled in request_filter",
333 ));
334 }
335
336 let upstream_name = ctx
337 .upstream
338 .as_ref()
339 .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
340
341 trace!(
342 correlation_id = %ctx.trace_id,
343 upstream = %upstream_name,
344 "Looking up upstream pool"
345 );
346
347 let pool = self
348 .upstream_pools
349 .get(upstream_name)
350 .await
351 .ok_or_else(|| {
352 error!(
353 correlation_id = %ctx.trace_id,
354 upstream = %upstream_name,
355 "Upstream pool not found"
356 );
357 Error::explain(
358 ErrorType::InternalError,
359 format!("Upstream pool '{}' not found", upstream_name),
360 )
361 })?;
362
363 let max_retries = route_match
365 .config
366 .retry_policy
367 .as_ref()
368 .map(|r| r.max_attempts)
369 .unwrap_or(1);
370
371 trace!(
372 correlation_id = %ctx.trace_id,
373 upstream = %upstream_name,
374 max_retries = max_retries,
375 "Starting upstream peer selection"
376 );
377
378 let mut last_error = None;
379 let selection_start = std::time::Instant::now();
380
381 for attempt in 1..=max_retries {
382 ctx.upstream_attempts = attempt;
383
384 trace!(
385 correlation_id = %ctx.trace_id,
386 upstream = %upstream_name,
387 attempt = attempt,
388 max_retries = max_retries,
389 "Attempting to select upstream peer"
390 );
391
392 match pool.select_peer(None).await {
393 Ok(peer) => {
394 let selection_duration = selection_start.elapsed();
395 debug!(
396 correlation_id = %ctx.trace_id,
397 upstream = %upstream_name,
398 peer_address = %peer.address(),
399 attempt = attempt,
400 selection_duration_us = selection_duration.as_micros(),
401 "Selected upstream peer"
402 );
403 return Ok(Box::new(peer));
404 }
405 Err(e) => {
406 warn!(
407 correlation_id = %ctx.trace_id,
408 upstream = %upstream_name,
409 attempt = attempt,
410 max_retries = max_retries,
411 error = %e,
412 "Failed to select upstream peer"
413 );
414 last_error = Some(e);
415
416 if attempt < max_retries {
417 let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
419 trace!(
420 correlation_id = %ctx.trace_id,
421 backoff_ms = backoff.as_millis(),
422 "Backing off before retry"
423 );
424 sleep(backoff).await;
425 }
426 }
427 }
428 }
429
430 let selection_duration = selection_start.elapsed();
431 error!(
432 correlation_id = %ctx.trace_id,
433 upstream = %upstream_name,
434 attempts = max_retries,
435 selection_duration_ms = selection_duration.as_millis(),
436 last_error = ?last_error,
437 "All upstream selection attempts failed"
438 );
439
440 Err(Error::explain(
441 ErrorType::InternalError,
442 format!("All upstream attempts failed: {:?}", last_error),
443 ))
444 }
445
446 async fn request_filter(
447 &self,
448 session: &mut Session,
449 ctx: &mut Self::CTX,
450 ) -> Result<bool, Box<Error>> {
451 trace!(
452 correlation_id = %ctx.trace_id,
453 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
454 "Starting request filter phase"
455 );
456
457 if let Some(route_id) = ctx.route_id.as_deref() {
460 if self.rate_limit_manager.has_route_limiter(route_id) {
461 let rate_result = self.rate_limit_manager.check(
462 route_id,
463 &ctx.client_ip,
464 &ctx.path,
465 Option::<&NoHeaderAccessor>::None,
466 );
467
468 if rate_result.limit > 0 {
470 ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
471 limit: rate_result.limit,
472 remaining: rate_result.remaining,
473 reset_at: rate_result.reset_at,
474 });
475 }
476
477 if !rate_result.allowed {
478 use sentinel_config::RateLimitAction;
479
480 match rate_result.action {
481 RateLimitAction::Reject => {
482 warn!(
483 correlation_id = %ctx.trace_id,
484 route_id = route_id,
485 client_ip = %ctx.client_ip,
486 limiter = %rate_result.limiter,
487 limit = rate_result.limit,
488 remaining = rate_result.remaining,
489 "Request rate limited"
490 );
491 self.metrics.record_blocked_request("rate_limited");
492
493 let audit_entry = AuditLogEntry::rate_limited(
495 &ctx.trace_id,
496 &ctx.method,
497 &ctx.path,
498 &ctx.client_ip,
499 &rate_result.limiter,
500 )
501 .with_route_id(route_id)
502 .with_status_code(rate_result.status_code);
503 self.log_manager.log_audit(&audit_entry);
504
505 let body = rate_result
507 .message
508 .unwrap_or_else(|| "Rate limit exceeded".to_string());
509
510 let retry_after = rate_result.reset_at.saturating_sub(
512 std::time::SystemTime::now()
513 .duration_since(std::time::UNIX_EPOCH)
514 .unwrap_or_default()
515 .as_secs(),
516 );
517 crate::http_helpers::write_rate_limit_error(
518 session,
519 rate_result.status_code,
520 &body,
521 rate_result.limit,
522 rate_result.remaining,
523 rate_result.reset_at,
524 retry_after,
525 )
526 .await?;
527 return Ok(true); }
529 RateLimitAction::LogOnly => {
530 debug!(
531 correlation_id = %ctx.trace_id,
532 route_id = route_id,
533 "Rate limit exceeded (log only mode)"
534 );
535 }
537 RateLimitAction::Delay => {
538 if let Some(delay_ms) = rate_result.suggested_delay_ms {
540 let actual_delay = delay_ms.min(rate_result.max_delay_ms);
542
543 if actual_delay > 0 {
544 debug!(
545 correlation_id = %ctx.trace_id,
546 route_id = route_id,
547 suggested_delay_ms = delay_ms,
548 max_delay_ms = rate_result.max_delay_ms,
549 actual_delay_ms = actual_delay,
550 "Applying rate limit delay"
551 );
552
553 tokio::time::sleep(std::time::Duration::from_millis(
554 actual_delay,
555 ))
556 .await;
557 }
558 }
559 }
561 }
562 }
563 }
564 }
565
566 if let Some(route_id) = ctx.route_id.as_deref() {
568 if let Some(ref route_config) = ctx.route_config {
569 for filter_id in &route_config.filters {
570 if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
571 ctx.geo_country_code = result.country_code.clone();
573 ctx.geo_lookup_performed = true;
574
575 if !result.allowed {
576 warn!(
577 correlation_id = %ctx.trace_id,
578 route_id = route_id,
579 client_ip = %ctx.client_ip,
580 country = ?result.country_code,
581 filter_id = %filter_id,
582 "Request blocked by geo filter"
583 );
584 self.metrics.record_blocked_request("geo_blocked");
585
586 let audit_entry = AuditLogEntry::new(
588 &ctx.trace_id,
589 AuditEventType::Blocked,
590 &ctx.method,
591 &ctx.path,
592 &ctx.client_ip,
593 )
594 .with_route_id(route_id)
595 .with_status_code(result.status_code)
596 .with_reason(format!(
597 "Geo blocked: country={}, filter={}",
598 result.country_code.as_deref().unwrap_or("unknown"),
599 filter_id
600 ));
601 self.log_manager.log_audit(&audit_entry);
602
603 let body = result
605 .block_message
606 .unwrap_or_else(|| "Access denied".to_string());
607
608 crate::http_helpers::write_error(
609 session,
610 result.status_code,
611 &body,
612 "text/plain",
613 )
614 .await?;
615 return Ok(true); }
617
618 break;
620 }
621 }
622 }
623 }
624
625 let is_websocket_upgrade = session
627 .req_header()
628 .headers
629 .get(http::header::UPGRADE)
630 .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
631 .unwrap_or(false);
632
633 if is_websocket_upgrade {
634 ctx.is_websocket_upgrade = true;
635
636 if let Some(ref route_config) = ctx.route_config {
638 if !route_config.websocket {
639 warn!(
640 correlation_id = %ctx.trace_id,
641 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
642 client_ip = %ctx.client_ip,
643 "WebSocket upgrade rejected: not enabled for route"
644 );
645
646 self.metrics.record_blocked_request("websocket_not_enabled");
647
648 let audit_entry = AuditLogEntry::new(
650 &ctx.trace_id,
651 AuditEventType::Blocked,
652 &ctx.method,
653 &ctx.path,
654 &ctx.client_ip,
655 )
656 .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
657 .with_action("websocket_rejected")
658 .with_reason("WebSocket not enabled for route");
659 self.log_manager.log_audit(&audit_entry);
660
661 crate::http_helpers::write_error(
663 session,
664 403,
665 "WebSocket not enabled for this route",
666 "text/plain",
667 )
668 .await?;
669 return Ok(true); }
671
672 debug!(
673 correlation_id = %ctx.trace_id,
674 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
675 "WebSocket upgrade request allowed"
676 );
677
678 if route_config.websocket_inspection {
680 let has_compression = session
682 .req_header()
683 .headers
684 .get("Sec-WebSocket-Extensions")
685 .and_then(|v| v.to_str().ok())
686 .map(|s| s.contains("permessage-deflate"))
687 .unwrap_or(false);
688
689 if has_compression {
690 debug!(
691 correlation_id = %ctx.trace_id,
692 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
693 "WebSocket inspection skipped: permessage-deflate negotiated"
694 );
695 ctx.websocket_skip_inspection = true;
696 } else {
697 ctx.websocket_inspection_enabled = true;
698
699 ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
701 sentinel_agent_protocol::EventType::WebSocketFrame,
702 );
703
704 debug!(
705 correlation_id = %ctx.trace_id,
706 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
707 agent_count = ctx.websocket_inspection_agents.len(),
708 "WebSocket frame inspection enabled"
709 );
710 }
711 }
712 }
713 }
714
715 if let Some(route_config) = ctx.route_config.clone() {
718 if route_config.service_type == sentinel_config::ServiceType::Static {
719 trace!(
720 correlation_id = %ctx.trace_id,
721 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
722 "Handling static file route"
723 );
724 let route_match = crate::routing::RouteMatch {
726 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
727 config: route_config.clone(),
728 };
729 return self.handle_static_route(session, ctx, &route_match).await;
730 } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
731 trace!(
732 correlation_id = %ctx.trace_id,
733 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
734 builtin_handler = ?route_config.builtin_handler,
735 "Handling builtin route"
736 );
737 let route_match = crate::routing::RouteMatch {
739 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
740 config: route_config.clone(),
741 };
742 return self.handle_builtin_route(session, ctx, &route_match).await;
743 }
744 }
745
746 if let Some(route_id) = ctx.route_id.clone() {
748 if let Some(validator) = self.validators.get(&route_id).await {
749 trace!(
750 correlation_id = %ctx.trace_id,
751 route_id = %route_id,
752 "Running API schema validation"
753 );
754 if let Some(result) = self
755 .validate_api_request(session, ctx, &route_id, &validator)
756 .await?
757 {
758 debug!(
759 correlation_id = %ctx.trace_id,
760 route_id = %route_id,
761 validation_passed = result,
762 "API validation complete"
763 );
764 return Ok(result);
765 }
766 }
767 }
768
769 let client_addr = session
771 .client_addr()
772 .map(|a| format!("{}", a))
773 .unwrap_or_else(|| "unknown".to_string());
774 let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
775
776 let req_header = session.req_header_mut();
777
778 req_header
780 .insert_header("X-Correlation-Id", &ctx.trace_id)
781 .ok();
782 req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
783
784 let config = ctx
786 .config
787 .get_or_insert_with(|| self.config_manager.current());
788
789 const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; let header_count = req_header.headers.len();
794 if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
795 && header_count > config.limits.max_header_count
796 {
797 warn!(
798 correlation_id = %ctx.trace_id,
799 header_count = header_count,
800 limit = config.limits.max_header_count,
801 "Request blocked: exceeds header count limit"
802 );
803
804 self.metrics.record_blocked_request("header_count_exceeded");
805 return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
806 }
807
808 if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
810 let total_header_size: usize = req_header
811 .headers
812 .iter()
813 .map(|(k, v)| k.as_str().len() + v.len())
814 .sum();
815
816 if total_header_size > config.limits.max_header_size_bytes {
817 warn!(
818 correlation_id = %ctx.trace_id,
819 header_size = total_header_size,
820 limit = config.limits.max_header_size_bytes,
821 "Request blocked: exceeds header size limit"
822 );
823
824 self.metrics.record_blocked_request("header_size_exceeded");
825 return Err(Error::explain(
826 ErrorType::InternalError,
827 "Headers too large",
828 ));
829 }
830 }
831
832 trace!(
834 correlation_id = %ctx.trace_id,
835 "Processing request through agents"
836 );
837 if let Err(e) = self
838 .process_agents(session, ctx, &client_addr, client_port)
839 .await
840 {
841 if let ErrorType::HTTPStatus(status) = e.etype() {
844 let error_msg = e.to_string();
846 let body = error_msg
847 .split("context:")
848 .nth(1)
849 .map(|s| s.trim())
850 .unwrap_or("Request blocked");
851 debug!(
852 correlation_id = %ctx.trace_id,
853 status = status,
854 body = %body,
855 "Sending HTTP error response for agent block"
856 );
857 crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
858 return Ok(true); }
860 return Err(e);
862 }
863
864 trace!(
865 correlation_id = %ctx.trace_id,
866 "Request filter phase complete, forwarding to upstream"
867 );
868
869 Ok(false) }
871
872 async fn request_body_filter(
879 &self,
880 _session: &mut Session,
881 body: &mut Option<Bytes>,
882 end_of_stream: bool,
883 ctx: &mut Self::CTX,
884 ) -> Result<(), Box<Error>> {
885 use sentinel_config::BodyStreamingMode;
886
887 if ctx.is_websocket_upgrade {
889 if let Some(ref handler) = ctx.websocket_handler {
890 let result = handler.process_client_data(body.take()).await;
891 match result {
892 crate::websocket::ProcessResult::Forward(data) => {
893 *body = data;
894 }
895 crate::websocket::ProcessResult::Close(reason) => {
896 warn!(
897 correlation_id = %ctx.trace_id,
898 code = reason.code,
899 reason = %reason.reason,
900 "WebSocket connection closed by agent (client->server)"
901 );
902 return Err(Error::explain(
904 ErrorType::InternalError,
905 format!("WebSocket closed: {} {}", reason.code, reason.reason),
906 ));
907 }
908 }
909 }
910 return Ok(());
912 }
913
914 let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
916 if chunk_len > 0 {
917 ctx.request_body_bytes += chunk_len as u64;
918
919 trace!(
920 correlation_id = %ctx.trace_id,
921 chunk_size = chunk_len,
922 total_body_bytes = ctx.request_body_bytes,
923 end_of_stream = end_of_stream,
924 streaming_mode = ?ctx.request_body_streaming_mode,
925 "Processing request body chunk"
926 );
927
928 let config = ctx
930 .config
931 .get_or_insert_with(|| self.config_manager.current());
932 if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
933 warn!(
934 correlation_id = %ctx.trace_id,
935 body_bytes = ctx.request_body_bytes,
936 limit = config.limits.max_body_size_bytes,
937 "Request body size limit exceeded"
938 );
939 self.metrics.record_blocked_request("body_size_exceeded");
940 return Err(Error::explain(
941 ErrorType::InternalError,
942 "Request body too large",
943 ));
944 }
945 }
946
947 if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
949 let config = ctx
950 .config
951 .get_or_insert_with(|| self.config_manager.current());
952 let max_inspection_bytes = config
953 .waf
954 .as_ref()
955 .map(|w| w.body_inspection.max_inspection_bytes as u64)
956 .unwrap_or(1024 * 1024);
957
958 match ctx.request_body_streaming_mode {
959 BodyStreamingMode::Stream => {
960 if body.is_some() {
962 self.process_body_chunk_streaming(body, end_of_stream, ctx)
963 .await?;
964 } else if end_of_stream && ctx.agent_needs_more {
965 self.process_body_chunk_streaming(body, end_of_stream, ctx)
967 .await?;
968 }
969 }
970 BodyStreamingMode::Hybrid { buffer_threshold } => {
971 if ctx.body_bytes_inspected < buffer_threshold as u64 {
973 if let Some(ref chunk) = body {
975 let bytes_to_buffer = std::cmp::min(
976 chunk.len(),
977 (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
978 );
979 ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
980 ctx.body_bytes_inspected += bytes_to_buffer as u64;
981
982 if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
984 {
985 self.send_buffered_body_to_agents(
987 end_of_stream && chunk.len() == bytes_to_buffer,
988 ctx,
989 )
990 .await?;
991 ctx.body_buffer.clear();
992
993 if bytes_to_buffer < chunk.len() {
995 let remaining = chunk.slice(bytes_to_buffer..);
996 let mut remaining_body = Some(remaining);
997 self.process_body_chunk_streaming(
998 &mut remaining_body,
999 end_of_stream,
1000 ctx,
1001 )
1002 .await?;
1003 }
1004 }
1005 }
1006 } else {
1007 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1009 .await?;
1010 }
1011 }
1012 BodyStreamingMode::Buffer => {
1013 if let Some(ref chunk) = body {
1015 if ctx.body_bytes_inspected < max_inspection_bytes {
1016 let bytes_to_inspect = std::cmp::min(
1017 chunk.len() as u64,
1018 max_inspection_bytes - ctx.body_bytes_inspected,
1019 ) as usize;
1020
1021 ctx.body_buffer
1022 .extend_from_slice(&chunk[..bytes_to_inspect]);
1023 ctx.body_bytes_inspected += bytes_to_inspect as u64;
1024
1025 trace!(
1026 correlation_id = %ctx.trace_id,
1027 bytes_inspected = ctx.body_bytes_inspected,
1028 max_inspection_bytes = max_inspection_bytes,
1029 buffer_size = ctx.body_buffer.len(),
1030 "Buffering body for agent inspection"
1031 );
1032 }
1033 }
1034
1035 let should_send =
1037 end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1038 if should_send && !ctx.body_buffer.is_empty() {
1039 self.send_buffered_body_to_agents(end_of_stream, ctx)
1040 .await?;
1041 ctx.body_buffer.clear();
1042 }
1043 }
1044 }
1045 }
1046
1047 if end_of_stream {
1048 trace!(
1049 correlation_id = %ctx.trace_id,
1050 total_body_bytes = ctx.request_body_bytes,
1051 bytes_inspected = ctx.body_bytes_inspected,
1052 "Request body complete"
1053 );
1054 }
1055
1056 Ok(())
1057 }
1058
1059 async fn response_filter(
1060 &self,
1061 _session: &mut Session,
1062 upstream_response: &mut ResponseHeader,
1063 ctx: &mut Self::CTX,
1064 ) -> Result<(), Box<Error>> {
1065 let status = upstream_response.status.as_u16();
1066 let duration = ctx.elapsed();
1067
1068 trace!(
1069 correlation_id = %ctx.trace_id,
1070 status = status,
1071 "Starting response filter phase"
1072 );
1073
1074 if status == 101 && ctx.is_websocket_upgrade {
1076 if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1077 let inspector = crate::websocket::WebSocketInspector::with_metrics(
1079 self.agent_manager.clone(),
1080 ctx.route_id
1081 .clone()
1082 .unwrap_or_else(|| "unknown".to_string()),
1083 ctx.trace_id.clone(),
1084 ctx.client_ip.clone(),
1085 100, Some(self.metrics.clone()),
1087 );
1088
1089 let handler = crate::websocket::WebSocketHandler::new(
1090 std::sync::Arc::new(inspector),
1091 1024 * 1024, );
1093
1094 ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1095
1096 info!(
1097 correlation_id = %ctx.trace_id,
1098 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1099 agent_count = ctx.websocket_inspection_agents.len(),
1100 "WebSocket upgrade successful, frame inspection enabled"
1101 );
1102 } else if ctx.websocket_skip_inspection {
1103 debug!(
1104 correlation_id = %ctx.trace_id,
1105 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1106 "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1107 );
1108 } else {
1109 debug!(
1110 correlation_id = %ctx.trace_id,
1111 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1112 "WebSocket upgrade successful"
1113 );
1114 }
1115 }
1116
1117 trace!(
1119 correlation_id = %ctx.trace_id,
1120 "Applying security headers"
1121 );
1122 self.apply_security_headers(upstream_response).ok();
1123
1124 upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1126
1127 if let Some(ref rate_info) = ctx.rate_limit_info {
1129 upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1130 upstream_response
1131 .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1132 upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1133 }
1134
1135 if let Some(ref country_code) = ctx.geo_country_code {
1137 upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1138 }
1139
1140 if status >= 400 {
1142 trace!(
1143 correlation_id = %ctx.trace_id,
1144 status = status,
1145 "Handling error response"
1146 );
1147 self.handle_error_response(upstream_response, ctx).await?;
1148 }
1149
1150 self.metrics.record_request(
1152 ctx.route_id.as_deref().unwrap_or("unknown"),
1153 &ctx.method,
1154 status,
1155 duration,
1156 );
1157
1158 if let Some(ref upstream) = ctx.upstream {
1160 let success = status < 500;
1161
1162 trace!(
1163 correlation_id = %ctx.trace_id,
1164 upstream = %upstream,
1165 success = success,
1166 status = status,
1167 "Recording passive health check result"
1168 );
1169
1170 let error_msg = if !success {
1171 Some(format!("HTTP {}", status))
1172 } else {
1173 None
1174 };
1175 self.passive_health
1176 .record_outcome(upstream, success, error_msg.as_deref())
1177 .await;
1178
1179 if let Some(pool) = self.upstream_pools.get(upstream).await {
1181 pool.report_result(upstream, success).await;
1182 }
1183
1184 if !success {
1185 warn!(
1186 correlation_id = %ctx.trace_id,
1187 upstream = %upstream,
1188 status = status,
1189 "Upstream returned error status"
1190 );
1191 }
1192 }
1193
1194 if status >= 500 {
1196 error!(
1197 correlation_id = %ctx.trace_id,
1198 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1199 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1200 method = %ctx.method,
1201 path = %ctx.path,
1202 status = status,
1203 duration_ms = duration.as_millis(),
1204 attempts = ctx.upstream_attempts,
1205 "Request completed with server error"
1206 );
1207 } else if status >= 400 {
1208 warn!(
1209 correlation_id = %ctx.trace_id,
1210 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1211 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1212 method = %ctx.method,
1213 path = %ctx.path,
1214 status = status,
1215 duration_ms = duration.as_millis(),
1216 "Request completed with client error"
1217 );
1218 } else {
1219 debug!(
1220 correlation_id = %ctx.trace_id,
1221 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1222 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1223 method = %ctx.method,
1224 path = %ctx.path,
1225 status = status,
1226 duration_ms = duration.as_millis(),
1227 attempts = ctx.upstream_attempts,
1228 "Request completed"
1229 );
1230 }
1231
1232 Ok(())
1233 }
1234
1235 async fn upstream_request_filter(
1238 &self,
1239 _session: &mut Session,
1240 upstream_request: &mut pingora::http::RequestHeader,
1241 ctx: &mut Self::CTX,
1242 ) -> Result<()>
1243 where
1244 Self::CTX: Send + Sync,
1245 {
1246 trace!(
1247 correlation_id = %ctx.trace_id,
1248 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1249 "Applying upstream request modifications"
1250 );
1251
1252 upstream_request
1254 .insert_header("X-Trace-Id", &ctx.trace_id)
1255 .ok();
1256
1257 upstream_request
1259 .insert_header("X-Forwarded-By", "Sentinel")
1260 .ok();
1261
1262 if let Some(ref route_config) = ctx.route_config {
1265 let mods = route_config.policies.request_headers.clone();
1266
1267 for (name, value) in mods.set {
1269 upstream_request.insert_header(name, value).ok();
1270 }
1271
1272 for (name, value) in mods.add {
1274 upstream_request.append_header(name, value).ok();
1275 }
1276
1277 for name in &mods.remove {
1279 upstream_request.remove_header(name);
1280 }
1281
1282 trace!(
1283 correlation_id = %ctx.trace_id,
1284 "Applied request header modifications"
1285 );
1286 }
1287
1288 upstream_request.remove_header("X-Internal-Token");
1290 upstream_request.remove_header("Authorization-Internal");
1291
1292 Ok(())
1293 }
1294
1295 fn response_body_filter(
1301 &self,
1302 _session: &mut Session,
1303 body: &mut Option<Bytes>,
1304 end_of_stream: bool,
1305 ctx: &mut Self::CTX,
1306 ) -> Result<Option<Duration>, Box<Error>> {
1307 if ctx.is_websocket_upgrade {
1310 if let Some(ref handler) = ctx.websocket_handler {
1311 let handler = handler.clone();
1312 let data = body.take();
1313
1314 let result = tokio::task::block_in_place(|| {
1317 tokio::runtime::Handle::current()
1318 .block_on(async { handler.process_server_data(data).await })
1319 });
1320
1321 match result {
1322 crate::websocket::ProcessResult::Forward(data) => {
1323 *body = data;
1324 }
1325 crate::websocket::ProcessResult::Close(reason) => {
1326 warn!(
1327 correlation_id = %ctx.trace_id,
1328 code = reason.code,
1329 reason = %reason.reason,
1330 "WebSocket connection closed by agent (server->client)"
1331 );
1332 let close_frame =
1335 crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
1336 let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
1337 if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
1338 *body = Some(Bytes::from(encoded));
1339 }
1340 }
1341 }
1342 }
1343 return Ok(None);
1345 }
1346
1347 if let Some(ref chunk) = body {
1349 ctx.response_bytes += chunk.len() as u64;
1350
1351 trace!(
1352 correlation_id = %ctx.trace_id,
1353 chunk_size = chunk.len(),
1354 total_response_bytes = ctx.response_bytes,
1355 end_of_stream = end_of_stream,
1356 "Processing response body chunk"
1357 );
1358
1359 if ctx.response_body_inspection_enabled
1363 && !ctx.response_body_inspection_agents.is_empty()
1364 {
1365 let config = ctx
1366 .config
1367 .get_or_insert_with(|| self.config_manager.current());
1368 let max_inspection_bytes = config
1369 .waf
1370 .as_ref()
1371 .map(|w| w.body_inspection.max_inspection_bytes as u64)
1372 .unwrap_or(1024 * 1024);
1373
1374 if ctx.response_body_bytes_inspected < max_inspection_bytes {
1375 let bytes_to_inspect = std::cmp::min(
1376 chunk.len() as u64,
1377 max_inspection_bytes - ctx.response_body_bytes_inspected,
1378 ) as usize;
1379
1380 ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
1384 ctx.response_body_chunk_index += 1;
1385
1386 trace!(
1387 correlation_id = %ctx.trace_id,
1388 bytes_inspected = ctx.response_body_bytes_inspected,
1389 max_inspection_bytes = max_inspection_bytes,
1390 chunk_index = ctx.response_body_chunk_index,
1391 "Tracking response body for inspection"
1392 );
1393 }
1394 }
1395 }
1396
1397 if end_of_stream {
1398 trace!(
1399 correlation_id = %ctx.trace_id,
1400 total_response_bytes = ctx.response_bytes,
1401 response_bytes_inspected = ctx.response_body_bytes_inspected,
1402 "Response body complete"
1403 );
1404 }
1405
1406 Ok(None)
1408 }
1409
1410 async fn connected_to_upstream(
1413 &self,
1414 _session: &mut Session,
1415 reused: bool,
1416 peer: &HttpPeer,
1417 #[cfg(unix)] _fd: RawFd,
1418 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
1419 digest: Option<&Digest>,
1420 ctx: &mut Self::CTX,
1421 ) -> Result<(), Box<Error>> {
1422 ctx.connection_reused = reused;
1424
1425 if reused {
1427 trace!(
1428 correlation_id = %ctx.trace_id,
1429 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1430 peer_address = %peer.address(),
1431 "Reusing existing upstream connection"
1432 );
1433 } else {
1434 debug!(
1435 correlation_id = %ctx.trace_id,
1436 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1437 peer_address = %peer.address(),
1438 ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
1439 "Established new upstream connection"
1440 );
1441 }
1442
1443 Ok(())
1444 }
1445
1446 fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
1456 let route_id = match ctx.route_id.as_deref() {
1458 Some(id) => id,
1459 None => {
1460 trace!(
1461 correlation_id = %ctx.trace_id,
1462 "Cache filter: no route ID, skipping cache"
1463 );
1464 return Ok(());
1465 }
1466 };
1467
1468 if !self.cache_manager.is_enabled(route_id) {
1470 trace!(
1471 correlation_id = %ctx.trace_id,
1472 route_id = %route_id,
1473 "Cache disabled for route"
1474 );
1475 return Ok(());
1476 }
1477
1478 if !self
1480 .cache_manager
1481 .is_method_cacheable(route_id, &ctx.method)
1482 {
1483 trace!(
1484 correlation_id = %ctx.trace_id,
1485 route_id = %route_id,
1486 method = %ctx.method,
1487 "Method not cacheable"
1488 );
1489 return Ok(());
1490 }
1491
1492 debug!(
1494 correlation_id = %ctx.trace_id,
1495 route_id = %route_id,
1496 method = %ctx.method,
1497 path = %ctx.path,
1498 "Enabling HTTP caching for request"
1499 );
1500
1501 let storage = get_cache_storage();
1503 let eviction = get_cache_eviction();
1504 let cache_lock = get_cache_lock();
1505
1506 session.cache.enable(
1508 storage,
1509 Some(eviction),
1510 None, Some(cache_lock),
1512 None, );
1514
1515 ctx.cache_eligible = true;
1517
1518 trace!(
1519 correlation_id = %ctx.trace_id,
1520 route_id = %route_id,
1521 cache_enabled = session.cache.enabled(),
1522 "Cache enabled for request"
1523 );
1524
1525 Ok(())
1526 }
1527
1528 fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
1533 let req_header = session.req_header();
1534 let method = req_header.method.as_str();
1535 let path = req_header.uri.path();
1536 let host = ctx.host.as_deref().unwrap_or("unknown");
1537 let query = req_header.uri.query();
1538
1539 let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1541
1542 trace!(
1543 correlation_id = %ctx.trace_id,
1544 cache_key = %key_string,
1545 "Generated cache key"
1546 );
1547
1548 Ok(CacheKey::default(req_header))
1551 }
1552
1553 fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
1558 session.cache.cache_miss();
1560
1561 if let Some(route_id) = ctx.route_id.as_deref() {
1563 self.cache_manager.stats().record_miss();
1564
1565 trace!(
1566 correlation_id = %ctx.trace_id,
1567 route_id = %route_id,
1568 path = %ctx.path,
1569 "Cache miss"
1570 );
1571 }
1572 }
1573
1574 async fn cache_hit_filter(
1580 &self,
1581 session: &mut Session,
1582 meta: &CacheMeta,
1583 _hit_handler: &mut HitHandler,
1584 is_fresh: bool,
1585 ctx: &mut Self::CTX,
1586 ) -> Result<Option<ForcedInvalidationKind>>
1587 where
1588 Self::CTX: Send + Sync,
1589 {
1590 let req_header = session.req_header();
1592 let method = req_header.method.as_str();
1593 let path = req_header.uri.path();
1594 let host = req_header.uri.host().unwrap_or("localhost");
1595 let query = req_header.uri.query();
1596
1597 let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1599
1600 if self.cache_manager.should_invalidate(&cache_key) {
1602 info!(
1603 correlation_id = %ctx.trace_id,
1604 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1605 cache_key = %cache_key,
1606 "Cache entry invalidated by purge request"
1607 );
1608 return Ok(Some(ForcedInvalidationKind::ForceExpired));
1610 }
1611
1612 if is_fresh {
1614 self.cache_manager.stats().record_hit();
1615
1616 debug!(
1617 correlation_id = %ctx.trace_id,
1618 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1619 is_fresh = is_fresh,
1620 "Cache hit (fresh)"
1621 );
1622 } else {
1623 trace!(
1624 correlation_id = %ctx.trace_id,
1625 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1626 is_fresh = is_fresh,
1627 "Cache hit (stale)"
1628 );
1629 }
1630
1631 Ok(None)
1633 }
1634
1635 fn response_cache_filter(
1640 &self,
1641 _session: &Session,
1642 resp: &ResponseHeader,
1643 ctx: &mut Self::CTX,
1644 ) -> Result<RespCacheable> {
1645 let route_id = match ctx.route_id.as_deref() {
1646 Some(id) => id,
1647 None => {
1648 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1649 "no_route",
1650 )));
1651 }
1652 };
1653
1654 if !self.cache_manager.is_enabled(route_id) {
1656 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1657 "disabled",
1658 )));
1659 }
1660
1661 let status = resp.status.as_u16();
1662
1663 if !self.cache_manager.is_status_cacheable(route_id, status) {
1665 trace!(
1666 correlation_id = %ctx.trace_id,
1667 route_id = %route_id,
1668 status = status,
1669 "Status code not cacheable"
1670 );
1671 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1672 }
1673
1674 if let Some(cache_control) = resp.headers.get("cache-control") {
1676 if let Ok(cc_str) = cache_control.to_str() {
1677 if crate::cache::CacheManager::is_no_cache(cc_str) {
1678 trace!(
1679 correlation_id = %ctx.trace_id,
1680 route_id = %route_id,
1681 cache_control = %cc_str,
1682 "Response has no-cache directive"
1683 );
1684 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1685 }
1686 }
1687 }
1688
1689 let cache_control = resp
1691 .headers
1692 .get("cache-control")
1693 .and_then(|v| v.to_str().ok());
1694 let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
1695
1696 if ttl.is_zero() {
1697 trace!(
1698 correlation_id = %ctx.trace_id,
1699 route_id = %route_id,
1700 "TTL is zero, not caching"
1701 );
1702 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1703 }
1704
1705 let config = self
1707 .cache_manager
1708 .get_route_config(route_id)
1709 .unwrap_or_default();
1710
1711 let now = std::time::SystemTime::now();
1713 let fresh_until = now + ttl;
1714
1715 let header = resp.clone();
1717
1718 let cache_meta = CacheMeta::new(
1720 fresh_until,
1721 now,
1722 config.stale_while_revalidate_secs as u32,
1723 config.stale_if_error_secs as u32,
1724 header,
1725 );
1726
1727 self.cache_manager.stats().record_store();
1729
1730 debug!(
1731 correlation_id = %ctx.trace_id,
1732 route_id = %route_id,
1733 status = status,
1734 ttl_secs = ttl.as_secs(),
1735 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1736 stale_if_error_secs = config.stale_if_error_secs,
1737 "Caching response"
1738 );
1739
1740 Ok(RespCacheable::Cacheable(cache_meta))
1741 }
1742
1743 fn should_serve_stale(
1747 &self,
1748 _session: &mut Session,
1749 ctx: &mut Self::CTX,
1750 error: Option<&Error>,
1751 ) -> bool {
1752 let route_id = match ctx.route_id.as_deref() {
1753 Some(id) => id,
1754 None => return false,
1755 };
1756
1757 let config = match self.cache_manager.get_route_config(route_id) {
1759 Some(c) => c,
1760 None => return false,
1761 };
1762
1763 if let Some(e) = error {
1765 if e.esource() == &pingora::ErrorSource::Upstream {
1767 debug!(
1768 correlation_id = %ctx.trace_id,
1769 route_id = %route_id,
1770 error = %e,
1771 stale_if_error_secs = config.stale_if_error_secs,
1772 "Considering stale-if-error"
1773 );
1774 return config.stale_if_error_secs > 0;
1775 }
1776 }
1777
1778 if error.is_none() && config.stale_while_revalidate_secs > 0 {
1780 trace!(
1781 correlation_id = %ctx.trace_id,
1782 route_id = %route_id,
1783 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1784 "Allowing stale-while-revalidate"
1785 );
1786 return true;
1787 }
1788
1789 false
1790 }
1791
1792 fn range_header_filter(
1802 &self,
1803 session: &mut Session,
1804 response: &mut ResponseHeader,
1805 ctx: &mut Self::CTX,
1806 ) -> pingora_proxy::RangeType
1807 where
1808 Self::CTX: Send + Sync,
1809 {
1810 let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
1812 matches!(
1814 config.service_type,
1815 sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
1816 )
1817 });
1818
1819 if !supports_range {
1820 trace!(
1821 correlation_id = %ctx.trace_id,
1822 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1823 "Range request not supported for this route type"
1824 );
1825 return pingora_proxy::RangeType::None;
1826 }
1827
1828 let range_type = pingora_proxy::range_header_filter(session.req_header(), response);
1830
1831 match &range_type {
1832 pingora_proxy::RangeType::None => {
1833 trace!(
1834 correlation_id = %ctx.trace_id,
1835 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1836 "No range request or not applicable"
1837 );
1838 }
1839 pingora_proxy::RangeType::Single(range) => {
1840 trace!(
1841 correlation_id = %ctx.trace_id,
1842 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1843 range_start = range.start,
1844 range_end = range.end,
1845 "Processing single-range request"
1846 );
1847 }
1848 pingora_proxy::RangeType::Multi(multi) => {
1849 trace!(
1850 correlation_id = %ctx.trace_id,
1851 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1852 range_count = multi.ranges.len(),
1853 "Processing multi-range request"
1854 );
1855 }
1856 pingora_proxy::RangeType::Invalid => {
1857 debug!(
1858 correlation_id = %ctx.trace_id,
1859 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1860 "Invalid range header"
1861 );
1862 }
1863 }
1864
1865 range_type
1866 }
1867
1868 async fn fail_to_proxy(
1871 &self,
1872 session: &mut Session,
1873 e: &Error,
1874 ctx: &mut Self::CTX,
1875 ) -> pingora_proxy::FailToProxy
1876 where
1877 Self::CTX: Send + Sync,
1878 {
1879 let error_code = match e.etype() {
1880 ErrorType::ConnectRefused => 503,
1882 ErrorType::ConnectTimedout => 504,
1883 ErrorType::ConnectNoRoute => 502,
1884
1885 ErrorType::ReadTimedout => 504,
1887 ErrorType::WriteTimedout => 504,
1888
1889 ErrorType::TLSHandshakeFailure => 502,
1891 ErrorType::InvalidCert => 502,
1892
1893 ErrorType::InvalidHTTPHeader => 400,
1895 ErrorType::H2Error => 502,
1896
1897 ErrorType::ConnectProxyFailure => 502,
1899 ErrorType::ConnectionClosed => 502,
1900
1901 ErrorType::HTTPStatus(status) => *status,
1903
1904 ErrorType::InternalError => {
1906 let error_str = e.to_string();
1908 if error_str.contains("upstream")
1909 || error_str.contains("DNS")
1910 || error_str.contains("resolve")
1911 {
1912 502
1913 } else {
1914 500
1915 }
1916 }
1917
1918 _ => 502,
1920 };
1921
1922 error!(
1923 correlation_id = %ctx.trace_id,
1924 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1925 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1926 error_type = ?e.etype(),
1927 error = %e,
1928 error_code = error_code,
1929 "Proxy error occurred"
1930 );
1931
1932 self.metrics
1934 .record_blocked_request(&format!("proxy_error_{}", error_code));
1935
1936 let error_message = match error_code {
1940 400 => "Bad Request",
1941 502 => "Bad Gateway",
1942 503 => "Service Unavailable",
1943 504 => "Gateway Timeout",
1944 _ => "Internal Server Error",
1945 };
1946
1947 let body = format!(
1949 r#"{{"error":"{} {}","trace_id":"{}"}}"#,
1950 error_code, error_message, ctx.trace_id
1951 );
1952
1953 let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
1955 header
1956 .insert_header("Content-Type", "application/json")
1957 .ok();
1958 header
1959 .insert_header("Content-Length", body.len().to_string())
1960 .ok();
1961 header
1962 .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
1963 .ok();
1964 header.insert_header("Connection", "close").ok();
1965
1966 if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
1968 warn!(
1969 correlation_id = %ctx.trace_id,
1970 error = %write_err,
1971 "Failed to write error response header"
1972 );
1973 } else {
1974 if let Err(write_err) = session
1976 .write_response_body(Some(bytes::Bytes::from(body)), true)
1977 .await
1978 {
1979 warn!(
1980 correlation_id = %ctx.trace_id,
1981 error = %write_err,
1982 "Failed to write error response body"
1983 );
1984 }
1985 }
1986
1987 pingora_proxy::FailToProxy {
1990 error_code,
1991 can_reuse_downstream: false,
1992 }
1993 }
1994
1995 fn error_while_proxy(
2001 &self,
2002 peer: &HttpPeer,
2003 session: &mut Session,
2004 e: Box<Error>,
2005 ctx: &mut Self::CTX,
2006 client_reused: bool,
2007 ) -> Box<Error> {
2008 let error_type = e.etype().clone();
2009 let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
2010
2011 let is_retryable = matches!(
2013 error_type,
2014 ErrorType::ConnectTimedout
2015 | ErrorType::ReadTimedout
2016 | ErrorType::WriteTimedout
2017 | ErrorType::ConnectionClosed
2018 | ErrorType::ConnectRefused
2019 );
2020
2021 warn!(
2023 correlation_id = %ctx.trace_id,
2024 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2025 upstream = %upstream_id,
2026 peer_address = %peer.address(),
2027 error_type = ?error_type,
2028 error = %e,
2029 client_reused = client_reused,
2030 is_retryable = is_retryable,
2031 "Error during proxy operation"
2032 );
2033
2034 let peer_address = peer.address().to_string();
2037 let upstream_pools = self.upstream_pools.clone();
2038 let upstream_id_owned = upstream_id.to_string();
2039 tokio::spawn(async move {
2040 if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
2041 pool.report_result(&peer_address, false).await;
2042 }
2043 });
2044
2045 self.metrics
2047 .record_blocked_request(&format!("proxy_error_{:?}", error_type));
2048
2049 let mut enhanced_error = e.more_context(format!(
2051 "Upstream: {}, Peer: {}, Attempts: {}",
2052 upstream_id,
2053 peer.address(),
2054 ctx.upstream_attempts
2055 ));
2056
2057 if is_retryable {
2062 let can_retry = if client_reused {
2063 !session.as_ref().retry_buffer_truncated()
2065 } else {
2066 true
2068 };
2069
2070 enhanced_error.retry.decide_reuse(can_retry);
2071
2072 if can_retry {
2073 debug!(
2074 correlation_id = %ctx.trace_id,
2075 upstream = %upstream_id,
2076 error_type = ?error_type,
2077 "Error is retryable, will attempt retry"
2078 );
2079 }
2080 } else {
2081 enhanced_error.retry.decide_reuse(false);
2083 }
2084
2085 enhanced_error
2086 }
2087
2088 async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
2089 self.reload_coordinator.dec_requests();
2091
2092 let duration = ctx.elapsed();
2093
2094 let status = session
2096 .response_written()
2097 .map(|r| r.status.as_u16())
2098 .unwrap_or(0);
2099
2100 if self.log_manager.access_log_enabled() {
2102 let access_entry = AccessLogEntry {
2103 timestamp: chrono::Utc::now().to_rfc3339(),
2104 trace_id: ctx.trace_id.clone(),
2105 method: ctx.method.clone(),
2106 path: ctx.path.clone(),
2107 query: ctx.query.clone(),
2108 protocol: "HTTP/1.1".to_string(),
2109 status,
2110 body_bytes: ctx.response_bytes,
2111 duration_ms: duration.as_millis() as u64,
2112 client_ip: ctx.client_ip.clone(),
2113 user_agent: ctx.user_agent.clone(),
2114 referer: ctx.referer.clone(),
2115 host: ctx.host.clone(),
2116 route_id: ctx.route_id.clone(),
2117 upstream: ctx.upstream.clone(),
2118 upstream_attempts: ctx.upstream_attempts,
2119 instance_id: self.app_state.instance_id.clone(),
2120 };
2121 self.log_manager.log_access(&access_entry);
2122 }
2123
2124 if tracing::enabled!(tracing::Level::DEBUG) {
2126 debug!(
2127 trace_id = %ctx.trace_id,
2128 method = %ctx.method,
2129 path = %ctx.path,
2130 route_id = ?ctx.route_id,
2131 upstream = ?ctx.upstream,
2132 status = status,
2133 duration_ms = duration.as_millis() as u64,
2134 upstream_attempts = ctx.upstream_attempts,
2135 error = ?_error.map(|e| e.to_string()),
2136 "Request completed"
2137 );
2138 }
2139
2140 if ctx.is_websocket_upgrade && status == 101 {
2142 info!(
2143 trace_id = %ctx.trace_id,
2144 route_id = ?ctx.route_id,
2145 upstream = ?ctx.upstream,
2146 client_ip = %ctx.client_ip,
2147 "WebSocket connection established"
2148 );
2149 }
2150 }
2151}
2152
2153impl SentinelProxy {
2158 async fn process_body_chunk_streaming(
2160 &self,
2161 body: &mut Option<Bytes>,
2162 end_of_stream: bool,
2163 ctx: &mut RequestContext,
2164 ) -> Result<(), Box<Error>> {
2165 let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
2167 let chunk_index = ctx.request_body_chunk_index;
2168 ctx.request_body_chunk_index += 1;
2169 ctx.body_bytes_inspected += chunk_data.len() as u64;
2170
2171 debug!(
2172 correlation_id = %ctx.trace_id,
2173 chunk_index = chunk_index,
2174 chunk_size = chunk_data.len(),
2175 end_of_stream = end_of_stream,
2176 "Streaming body chunk to agents"
2177 );
2178
2179 let agent_ctx = crate::agents::AgentCallContext {
2181 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
2182 metadata: sentinel_agent_protocol::RequestMetadata {
2183 correlation_id: ctx.trace_id.clone(),
2184 request_id: ctx.trace_id.clone(),
2185 client_ip: ctx.client_ip.clone(),
2186 client_port: 0,
2187 server_name: ctx.host.clone(),
2188 protocol: "HTTP/1.1".to_string(),
2189 tls_version: None,
2190 tls_cipher: None,
2191 route_id: ctx.route_id.clone(),
2192 upstream_id: ctx.upstream.clone(),
2193 timestamp: chrono::Utc::now().to_rfc3339(),
2194 },
2195 route_id: ctx.route_id.clone(),
2196 upstream_id: ctx.upstream.clone(),
2197 request_body: None, response_body: None,
2199 };
2200
2201 let agent_ids = ctx.body_inspection_agents.clone();
2202 let total_size = None; match self
2205 .agent_manager
2206 .process_request_body_streaming(
2207 &agent_ctx,
2208 &chunk_data,
2209 end_of_stream,
2210 chunk_index,
2211 ctx.body_bytes_inspected as usize,
2212 total_size,
2213 &agent_ids,
2214 )
2215 .await
2216 {
2217 Ok(decision) => {
2218 ctx.agent_needs_more = decision.needs_more;
2220
2221 if let Some(ref mutation) = decision.request_body_mutation {
2223 if !mutation.is_pass_through() {
2224 if mutation.is_drop() {
2225 *body = None;
2227 trace!(
2228 correlation_id = %ctx.trace_id,
2229 chunk_index = chunk_index,
2230 "Agent dropped body chunk"
2231 );
2232 } else if let Some(ref new_data) = mutation.data {
2233 *body = Some(Bytes::from(new_data.clone()));
2235 trace!(
2236 correlation_id = %ctx.trace_id,
2237 chunk_index = chunk_index,
2238 original_size = chunk_data.len(),
2239 new_size = new_data.len(),
2240 "Agent mutated body chunk"
2241 );
2242 }
2243 }
2244 }
2245
2246 if !decision.needs_more && !decision.is_allow() {
2248 warn!(
2249 correlation_id = %ctx.trace_id,
2250 action = ?decision.action,
2251 "Agent blocked request body"
2252 );
2253 self.metrics.record_blocked_request("agent_body_inspection");
2254
2255 let (status, message) = match &decision.action {
2256 crate::agents::AgentAction::Block { status, body, .. } => (
2257 *status,
2258 body.clone().unwrap_or_else(|| "Blocked".to_string()),
2259 ),
2260 _ => (403, "Forbidden".to_string()),
2261 };
2262
2263 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2264 }
2265
2266 trace!(
2267 correlation_id = %ctx.trace_id,
2268 needs_more = decision.needs_more,
2269 "Agent processed body chunk"
2270 );
2271 }
2272 Err(e) => {
2273 let fail_closed = ctx
2274 .route_config
2275 .as_ref()
2276 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2277 .unwrap_or(false);
2278
2279 if fail_closed {
2280 error!(
2281 correlation_id = %ctx.trace_id,
2282 error = %e,
2283 "Agent streaming body inspection failed, blocking (fail-closed)"
2284 );
2285 return Err(Error::explain(
2286 ErrorType::HTTPStatus(503),
2287 "Service unavailable",
2288 ));
2289 } else {
2290 warn!(
2291 correlation_id = %ctx.trace_id,
2292 error = %e,
2293 "Agent streaming body inspection failed, allowing (fail-open)"
2294 );
2295 }
2296 }
2297 }
2298
2299 Ok(())
2300 }
2301
2302 async fn send_buffered_body_to_agents(
2304 &self,
2305 end_of_stream: bool,
2306 ctx: &mut RequestContext,
2307 ) -> Result<(), Box<Error>> {
2308 debug!(
2309 correlation_id = %ctx.trace_id,
2310 buffer_size = ctx.body_buffer.len(),
2311 end_of_stream = end_of_stream,
2312 agent_count = ctx.body_inspection_agents.len(),
2313 decompression_enabled = ctx.decompression_enabled,
2314 "Sending buffered body to agents for inspection"
2315 );
2316
2317 let body_for_inspection = if ctx.decompression_enabled {
2319 if let Some(ref encoding) = ctx.body_content_encoding {
2320 let config = crate::decompression::DecompressionConfig {
2321 max_ratio: ctx.max_decompression_ratio,
2322 max_output_bytes: ctx.max_decompression_bytes,
2323 };
2324
2325 match crate::decompression::decompress_body(
2326 &ctx.body_buffer,
2327 encoding,
2328 &config,
2329 ) {
2330 Ok(result) => {
2331 ctx.body_was_decompressed = true;
2332 self.metrics
2333 .record_decompression_success(encoding, result.ratio);
2334 debug!(
2335 correlation_id = %ctx.trace_id,
2336 encoding = %encoding,
2337 compressed_size = result.compressed_size,
2338 decompressed_size = result.decompressed_size,
2339 ratio = result.ratio,
2340 "Body decompressed for agent inspection"
2341 );
2342 result.data
2343 }
2344 Err(e) => {
2345 let failure_reason = match &e {
2347 crate::decompression::DecompressionError::RatioExceeded { .. } => {
2348 "ratio_exceeded"
2349 }
2350 crate::decompression::DecompressionError::SizeExceeded { .. } => {
2351 "size_exceeded"
2352 }
2353 crate::decompression::DecompressionError::InvalidData { .. } => {
2354 "invalid_data"
2355 }
2356 crate::decompression::DecompressionError::UnsupportedEncoding { .. } => {
2357 "unsupported"
2358 }
2359 crate::decompression::DecompressionError::IoError(_) => "io_error",
2360 };
2361 self.metrics
2362 .record_decompression_failure(encoding, failure_reason);
2363
2364 let fail_closed = ctx
2366 .route_config
2367 .as_ref()
2368 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2369 .unwrap_or(false);
2370
2371 if fail_closed {
2372 error!(
2373 correlation_id = %ctx.trace_id,
2374 error = %e,
2375 encoding = %encoding,
2376 "Decompression failed, blocking (fail-closed)"
2377 );
2378 return Err(Error::explain(
2379 ErrorType::HTTPStatus(400),
2380 "Invalid compressed body",
2381 ));
2382 } else {
2383 warn!(
2384 correlation_id = %ctx.trace_id,
2385 error = %e,
2386 encoding = %encoding,
2387 "Decompression failed, sending compressed body (fail-open)"
2388 );
2389 ctx.body_buffer.clone()
2390 }
2391 }
2392 }
2393 } else {
2394 ctx.body_buffer.clone()
2395 }
2396 } else {
2397 ctx.body_buffer.clone()
2398 };
2399
2400 let agent_ctx = crate::agents::AgentCallContext {
2401 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
2402 metadata: sentinel_agent_protocol::RequestMetadata {
2403 correlation_id: ctx.trace_id.clone(),
2404 request_id: ctx.trace_id.clone(),
2405 client_ip: ctx.client_ip.clone(),
2406 client_port: 0,
2407 server_name: ctx.host.clone(),
2408 protocol: "HTTP/1.1".to_string(),
2409 tls_version: None,
2410 tls_cipher: None,
2411 route_id: ctx.route_id.clone(),
2412 upstream_id: ctx.upstream.clone(),
2413 timestamp: chrono::Utc::now().to_rfc3339(),
2414 },
2415 route_id: ctx.route_id.clone(),
2416 upstream_id: ctx.upstream.clone(),
2417 request_body: Some(body_for_inspection.clone()),
2418 response_body: None,
2419 };
2420
2421 let agent_ids = ctx.body_inspection_agents.clone();
2422 match self
2423 .agent_manager
2424 .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
2425 .await
2426 {
2427 Ok(decision) => {
2428 if !decision.is_allow() {
2429 warn!(
2430 correlation_id = %ctx.trace_id,
2431 action = ?decision.action,
2432 "Agent blocked request body"
2433 );
2434 self.metrics.record_blocked_request("agent_body_inspection");
2435
2436 let (status, message) = match &decision.action {
2437 crate::agents::AgentAction::Block { status, body, .. } => (
2438 *status,
2439 body.clone().unwrap_or_else(|| "Blocked".to_string()),
2440 ),
2441 _ => (403, "Forbidden".to_string()),
2442 };
2443
2444 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2445 }
2446
2447 trace!(
2448 correlation_id = %ctx.trace_id,
2449 "Agent allowed request body"
2450 );
2451 }
2452 Err(e) => {
2453 let fail_closed = ctx
2454 .route_config
2455 .as_ref()
2456 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2457 .unwrap_or(false);
2458
2459 if fail_closed {
2460 error!(
2461 correlation_id = %ctx.trace_id,
2462 error = %e,
2463 "Agent body inspection failed, blocking (fail-closed)"
2464 );
2465 return Err(Error::explain(
2466 ErrorType::HTTPStatus(503),
2467 "Service unavailable",
2468 ));
2469 } else {
2470 warn!(
2471 correlation_id = %ctx.trace_id,
2472 error = %e,
2473 "Agent body inspection failed, allowing (fail-open)"
2474 );
2475 }
2476 }
2477 }
2478
2479 Ok(())
2480 }
2481}