1use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14 CacheKey, CacheMeta, ForcedInvalidationKind, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
23use crate::rate_limit::HeaderAccessor;
24use crate::routing::RequestInfo;
25
26use super::context::RequestContext;
27use super::SentinelProxy;
28
29struct NoHeaderAccessor;
31impl HeaderAccessor for NoHeaderAccessor {
32 fn get_header(&self, _name: &str) -> Option<String> {
33 None
34 }
35}
36
37#[async_trait]
38impl ProxyHttp for SentinelProxy {
39 type CTX = RequestContext;
40
41 fn new_ctx(&self) -> Self::CTX {
42 RequestContext::new()
43 }
44
45 fn fail_to_connect(
46 &self,
47 _session: &mut Session,
48 peer: &HttpPeer,
49 ctx: &mut Self::CTX,
50 e: Box<Error>,
51 ) -> Box<Error> {
52 error!(
53 correlation_id = %ctx.trace_id,
54 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
55 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
56 peer_address = %peer.address(),
57 error = %e,
58 "Failed to connect to upstream peer"
59 );
60 e
62 }
63
64 async fn early_request_filter(
67 &self,
68 session: &mut Session,
69 ctx: &mut Self::CTX,
70 ) -> Result<(), Box<Error>> {
71 let req_header = session.req_header();
73 let method = req_header.method.as_str();
74 let path = req_header.uri.path();
75 let host = req_header
76 .headers
77 .get("host")
78 .and_then(|h| h.to_str().ok())
79 .unwrap_or("");
80
81 ctx.method = method.to_string();
82 ctx.path = path.to_string();
83 ctx.host = Some(host.to_string());
84
85 let route_match = {
87 let route_matcher = self.route_matcher.read();
88 let request_info = RequestInfo::new(method, path, host);
89 match route_matcher.match_request(&request_info) {
90 Some(m) => m,
91 None => return Ok(()), }
93 };
94
95 ctx.trace_id = self.get_trace_id(session);
96 ctx.route_id = Some(route_match.route_id.to_string());
97 ctx.route_config = Some(route_match.config.clone());
98
99 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
101 trace!(
102 correlation_id = %ctx.trace_id,
103 route_id = %route_match.route_id,
104 builtin_handler = ?route_match.config.builtin_handler,
105 "Handling builtin route in early_request_filter"
106 );
107
108 let handled = self
110 .handle_builtin_route(session, ctx, &route_match)
111 .await?;
112
113 if handled {
114 return Err(Error::explain(
116 ErrorType::InternalError,
117 "Builtin handler complete",
118 ));
119 }
120 }
121
122 Ok(())
123 }
124
125 async fn upstream_peer(
126 &self,
127 session: &mut Session,
128 ctx: &mut Self::CTX,
129 ) -> Result<Box<HttpPeer>, Box<Error>> {
130 self.reload_coordinator.inc_requests();
132
133 if ctx.config.is_none() {
135 ctx.config = Some(self.config_manager.current());
136 }
137
138 if ctx.client_ip.is_empty() {
140 ctx.client_ip = session
141 .client_addr()
142 .map(|a| a.to_string())
143 .unwrap_or_else(|| "unknown".to_string());
144 }
145
146 let req_header = session.req_header();
147
148 if ctx.method.is_empty() {
150 ctx.method = req_header.method.to_string();
151 ctx.path = req_header.uri.path().to_string();
152 ctx.query = req_header.uri.query().map(|q| q.to_string());
153 ctx.host = req_header
154 .headers
155 .get("host")
156 .and_then(|v| v.to_str().ok())
157 .map(|s| s.to_string());
158 }
159 ctx.user_agent = req_header
160 .headers
161 .get("user-agent")
162 .and_then(|v| v.to_str().ok())
163 .map(|s| s.to_string());
164 ctx.referer = req_header
165 .headers
166 .get("referer")
167 .and_then(|v| v.to_str().ok())
168 .map(|s| s.to_string());
169
170 trace!(
171 correlation_id = %ctx.trace_id,
172 client_ip = %ctx.client_ip,
173 "Request received, initializing context"
174 );
175
176 let route_match = if let Some(ref route_config) = ctx.route_config {
178 let route_id = ctx.route_id.as_deref().unwrap_or("");
179 crate::routing::RouteMatch {
180 route_id: sentinel_common::RouteId::new(route_id),
181 config: route_config.clone(),
182 }
183 } else {
184 let (match_result, route_duration) = {
186 let route_matcher = self.route_matcher.read();
187 let host = ctx.host.as_deref().unwrap_or("");
188
189 let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
191
192 if route_matcher.needs_headers() {
194 request_info = request_info
195 .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
196 }
197
198 if route_matcher.needs_query_params() {
200 request_info =
201 request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
202 }
203
204 trace!(
205 correlation_id = %ctx.trace_id,
206 method = %request_info.method,
207 path = %request_info.path,
208 host = %request_info.host,
209 "Built request info for route matching"
210 );
211
212 let route_start = std::time::Instant::now();
213 let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
214 warn!(
215 correlation_id = %ctx.trace_id,
216 method = %request_info.method,
217 path = %request_info.path,
218 host = %request_info.host,
219 "No matching route found for request"
220 );
221 Error::explain(ErrorType::InternalError, "No matching route found")
222 })?;
223 let route_duration = route_start.elapsed();
224 (route_match, route_duration)
226 };
227
228 ctx.route_id = Some(match_result.route_id.to_string());
229 ctx.route_config = Some(match_result.config.clone());
230
231 trace!(
232 correlation_id = %ctx.trace_id,
233 route_id = %match_result.route_id,
234 route_duration_us = route_duration.as_micros(),
235 service_type = ?match_result.config.service_type,
236 "Route matched"
237 );
238 match_result
239 };
240
241 if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
243 trace!(
244 correlation_id = %ctx.trace_id,
245 route_id = %route_match.route_id,
246 builtin_handler = ?route_match.config.builtin_handler,
247 "Route type is builtin, skipping upstream"
248 );
249 ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
251 return Err(Error::explain(
253 ErrorType::InternalError,
254 "Builtin handler handled in request_filter",
255 ));
256 }
257
258 if route_match.config.service_type == sentinel_config::ServiceType::Static {
260 trace!(
261 correlation_id = %ctx.trace_id,
262 route_id = %route_match.route_id,
263 "Route type is static, checking for static server"
264 );
265 if self
267 .static_servers
268 .get(route_match.route_id.as_str())
269 .await
270 .is_some()
271 {
272 ctx.upstream = Some(format!("_static_{}", route_match.route_id));
274 info!(
275 correlation_id = %ctx.trace_id,
276 route_id = %route_match.route_id,
277 path = %ctx.path,
278 "Serving static file"
279 );
280 return Err(Error::explain(
282 ErrorType::InternalError,
283 "Static file serving handled in request_filter",
284 ));
285 }
286 }
287
288 if let Some(ref upstream) = route_match.config.upstream {
290 ctx.upstream = Some(upstream.clone());
291 trace!(
292 correlation_id = %ctx.trace_id,
293 route_id = %route_match.route_id,
294 upstream = %upstream,
295 "Upstream configured for route"
296 );
297 } else {
298 error!(
299 correlation_id = %ctx.trace_id,
300 route_id = %route_match.route_id,
301 "Route has no upstream configured"
302 );
303 return Err(Error::explain(
304 ErrorType::InternalError,
305 format!(
306 "Route '{}' has no upstream configured",
307 route_match.route_id
308 ),
309 ));
310 }
311
312 debug!(
313 correlation_id = %ctx.trace_id,
314 route_id = %route_match.route_id,
315 upstream = ?ctx.upstream,
316 method = %req_header.method,
317 path = %req_header.uri.path(),
318 host = ctx.host.as_deref().unwrap_or("-"),
319 client_ip = %ctx.client_ip,
320 "Processing request"
321 );
322
323 if ctx
325 .upstream
326 .as_ref()
327 .is_some_and(|u| u.starts_with("_static_"))
328 {
329 return Err(Error::explain(
331 ErrorType::InternalError,
332 "Static route should be handled in request_filter",
333 ));
334 }
335
336 let upstream_name = ctx
337 .upstream
338 .as_ref()
339 .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
340
341 trace!(
342 correlation_id = %ctx.trace_id,
343 upstream = %upstream_name,
344 "Looking up upstream pool"
345 );
346
347 let pool = self
348 .upstream_pools
349 .get(upstream_name)
350 .await
351 .ok_or_else(|| {
352 error!(
353 correlation_id = %ctx.trace_id,
354 upstream = %upstream_name,
355 "Upstream pool not found"
356 );
357 Error::explain(
358 ErrorType::InternalError,
359 format!("Upstream pool '{}' not found", upstream_name),
360 )
361 })?;
362
363 let max_retries = route_match
365 .config
366 .retry_policy
367 .as_ref()
368 .map(|r| r.max_attempts)
369 .unwrap_or(1);
370
371 trace!(
372 correlation_id = %ctx.trace_id,
373 upstream = %upstream_name,
374 max_retries = max_retries,
375 "Starting upstream peer selection"
376 );
377
378 let mut last_error = None;
379 let selection_start = std::time::Instant::now();
380
381 for attempt in 1..=max_retries {
382 ctx.upstream_attempts = attempt;
383
384 trace!(
385 correlation_id = %ctx.trace_id,
386 upstream = %upstream_name,
387 attempt = attempt,
388 max_retries = max_retries,
389 "Attempting to select upstream peer"
390 );
391
392 match pool.select_peer(None).await {
393 Ok(peer) => {
394 let selection_duration = selection_start.elapsed();
395 debug!(
396 correlation_id = %ctx.trace_id,
397 upstream = %upstream_name,
398 peer_address = %peer.address(),
399 attempt = attempt,
400 selection_duration_us = selection_duration.as_micros(),
401 "Selected upstream peer"
402 );
403 return Ok(Box::new(peer));
404 }
405 Err(e) => {
406 warn!(
407 correlation_id = %ctx.trace_id,
408 upstream = %upstream_name,
409 attempt = attempt,
410 max_retries = max_retries,
411 error = %e,
412 "Failed to select upstream peer"
413 );
414 last_error = Some(e);
415
416 if attempt < max_retries {
417 let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
419 trace!(
420 correlation_id = %ctx.trace_id,
421 backoff_ms = backoff.as_millis(),
422 "Backing off before retry"
423 );
424 sleep(backoff).await;
425 }
426 }
427 }
428 }
429
430 let selection_duration = selection_start.elapsed();
431 error!(
432 correlation_id = %ctx.trace_id,
433 upstream = %upstream_name,
434 attempts = max_retries,
435 selection_duration_ms = selection_duration.as_millis(),
436 last_error = ?last_error,
437 "All upstream selection attempts failed"
438 );
439
440 Err(Error::explain(
441 ErrorType::InternalError,
442 format!("All upstream attempts failed: {:?}", last_error),
443 ))
444 }
445
446 async fn request_filter(
447 &self,
448 session: &mut Session,
449 ctx: &mut Self::CTX,
450 ) -> Result<bool, Box<Error>> {
451 trace!(
452 correlation_id = %ctx.trace_id,
453 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
454 "Starting request filter phase"
455 );
456
457 if let Some(route_id) = ctx.route_id.as_deref() {
460 if self.rate_limit_manager.has_route_limiter(route_id) {
461 let rate_result = self.rate_limit_manager.check(
462 route_id,
463 &ctx.client_ip,
464 &ctx.path,
465 Option::<&NoHeaderAccessor>::None,
466 );
467
468 if !rate_result.allowed {
469 use sentinel_config::RateLimitAction;
470
471 match rate_result.action {
472 RateLimitAction::Reject => {
473 warn!(
474 correlation_id = %ctx.trace_id,
475 route_id = route_id,
476 client_ip = %ctx.client_ip,
477 limiter = %rate_result.limiter,
478 "Request rate limited"
479 );
480 self.metrics.record_blocked_request("rate_limited");
481
482 let audit_entry = AuditLogEntry::rate_limited(
484 &ctx.trace_id,
485 &ctx.method,
486 &ctx.path,
487 &ctx.client_ip,
488 &rate_result.limiter,
489 )
490 .with_route_id(route_id)
491 .with_status_code(rate_result.status_code);
492 self.log_manager.log_audit(&audit_entry);
493
494 let body = rate_result
496 .message
497 .unwrap_or_else(|| "Rate limit exceeded".to_string());
498 crate::http_helpers::write_error(
499 session,
500 rate_result.status_code,
501 &body,
502 "text/plain",
503 )
504 .await?;
505 return Ok(true); }
507 RateLimitAction::LogOnly => {
508 debug!(
509 correlation_id = %ctx.trace_id,
510 route_id = route_id,
511 "Rate limit exceeded (log only mode)"
512 );
513 }
515 RateLimitAction::Delay => {
516 debug!(
518 correlation_id = %ctx.trace_id,
519 route_id = route_id,
520 "Rate limit delay mode not yet implemented, allowing request"
521 );
522 }
523 }
524 }
525 }
526 }
527
528 let is_websocket_upgrade = session
530 .req_header()
531 .headers
532 .get(http::header::UPGRADE)
533 .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
534 .unwrap_or(false);
535
536 if is_websocket_upgrade {
537 ctx.is_websocket_upgrade = true;
538
539 if let Some(ref route_config) = ctx.route_config {
541 if !route_config.websocket {
542 warn!(
543 correlation_id = %ctx.trace_id,
544 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
545 client_ip = %ctx.client_ip,
546 "WebSocket upgrade rejected: not enabled for route"
547 );
548
549 self.metrics.record_blocked_request("websocket_not_enabled");
550
551 let audit_entry = AuditLogEntry::new(
553 &ctx.trace_id,
554 AuditEventType::Blocked,
555 &ctx.method,
556 &ctx.path,
557 &ctx.client_ip,
558 )
559 .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
560 .with_action("websocket_rejected")
561 .with_reason("WebSocket not enabled for route");
562 self.log_manager.log_audit(&audit_entry);
563
564 crate::http_helpers::write_error(
566 session,
567 403,
568 "WebSocket not enabled for this route",
569 "text/plain",
570 )
571 .await?;
572 return Ok(true); }
574
575 debug!(
576 correlation_id = %ctx.trace_id,
577 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
578 "WebSocket upgrade request allowed"
579 );
580
581 if route_config.websocket_inspection {
583 let has_compression = session
585 .req_header()
586 .headers
587 .get("Sec-WebSocket-Extensions")
588 .and_then(|v| v.to_str().ok())
589 .map(|s| s.contains("permessage-deflate"))
590 .unwrap_or(false);
591
592 if has_compression {
593 debug!(
594 correlation_id = %ctx.trace_id,
595 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
596 "WebSocket inspection skipped: permessage-deflate negotiated"
597 );
598 ctx.websocket_skip_inspection = true;
599 } else {
600 ctx.websocket_inspection_enabled = true;
601
602 ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
604 sentinel_agent_protocol::EventType::WebSocketFrame,
605 );
606
607 debug!(
608 correlation_id = %ctx.trace_id,
609 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
610 agent_count = ctx.websocket_inspection_agents.len(),
611 "WebSocket frame inspection enabled"
612 );
613 }
614 }
615 }
616 }
617
618 if let Some(route_config) = ctx.route_config.clone() {
621 if route_config.service_type == sentinel_config::ServiceType::Static {
622 trace!(
623 correlation_id = %ctx.trace_id,
624 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
625 "Handling static file route"
626 );
627 let route_match = crate::routing::RouteMatch {
629 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
630 config: route_config.clone(),
631 };
632 return self.handle_static_route(session, ctx, &route_match).await;
633 } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
634 trace!(
635 correlation_id = %ctx.trace_id,
636 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
637 builtin_handler = ?route_config.builtin_handler,
638 "Handling builtin route"
639 );
640 let route_match = crate::routing::RouteMatch {
642 route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
643 config: route_config.clone(),
644 };
645 return self.handle_builtin_route(session, ctx, &route_match).await;
646 }
647 }
648
649 if let Some(route_id) = ctx.route_id.clone() {
651 if let Some(validator) = self.validators.get(&route_id).await {
652 trace!(
653 correlation_id = %ctx.trace_id,
654 route_id = %route_id,
655 "Running API schema validation"
656 );
657 if let Some(result) = self
658 .validate_api_request(session, ctx, &route_id, &validator)
659 .await?
660 {
661 debug!(
662 correlation_id = %ctx.trace_id,
663 route_id = %route_id,
664 validation_passed = result,
665 "API validation complete"
666 );
667 return Ok(result);
668 }
669 }
670 }
671
672 let client_addr = session
674 .client_addr()
675 .map(|a| format!("{}", a))
676 .unwrap_or_else(|| "unknown".to_string());
677 let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
678
679 let req_header = session.req_header_mut();
680
681 req_header
683 .insert_header("X-Correlation-Id", &ctx.trace_id)
684 .ok();
685 req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
686
687 let config = ctx
689 .config
690 .get_or_insert_with(|| self.config_manager.current());
691
692 const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; let header_count = req_header.headers.len();
697 if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
698 && header_count > config.limits.max_header_count
699 {
700 warn!(
701 correlation_id = %ctx.trace_id,
702 header_count = header_count,
703 limit = config.limits.max_header_count,
704 "Request blocked: exceeds header count limit"
705 );
706
707 self.metrics.record_blocked_request("header_count_exceeded");
708 return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
709 }
710
711 if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
713 let total_header_size: usize = req_header
714 .headers
715 .iter()
716 .map(|(k, v)| k.as_str().len() + v.len())
717 .sum();
718
719 if total_header_size > config.limits.max_header_size_bytes {
720 warn!(
721 correlation_id = %ctx.trace_id,
722 header_size = total_header_size,
723 limit = config.limits.max_header_size_bytes,
724 "Request blocked: exceeds header size limit"
725 );
726
727 self.metrics.record_blocked_request("header_size_exceeded");
728 return Err(Error::explain(
729 ErrorType::InternalError,
730 "Headers too large",
731 ));
732 }
733 }
734
735 trace!(
737 correlation_id = %ctx.trace_id,
738 "Processing request through agents"
739 );
740 self.process_agents(session, ctx, &client_addr, client_port)
741 .await?;
742
743 trace!(
744 correlation_id = %ctx.trace_id,
745 "Request filter phase complete, forwarding to upstream"
746 );
747
748 Ok(false) }
750
751 async fn request_body_filter(
758 &self,
759 _session: &mut Session,
760 body: &mut Option<Bytes>,
761 end_of_stream: bool,
762 ctx: &mut Self::CTX,
763 ) -> Result<(), Box<Error>> {
764 use sentinel_config::BodyStreamingMode;
765
766 if ctx.is_websocket_upgrade {
768 if let Some(ref handler) = ctx.websocket_handler {
769 let result = handler.process_client_data(body.take()).await;
770 match result {
771 crate::websocket::ProcessResult::Forward(data) => {
772 *body = data;
773 }
774 crate::websocket::ProcessResult::Close(reason) => {
775 warn!(
776 correlation_id = %ctx.trace_id,
777 code = reason.code,
778 reason = %reason.reason,
779 "WebSocket connection closed by agent (client->server)"
780 );
781 return Err(Error::explain(
783 ErrorType::InternalError,
784 format!("WebSocket closed: {} {}", reason.code, reason.reason),
785 ));
786 }
787 }
788 }
789 return Ok(());
791 }
792
793 let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
795 if chunk_len > 0 {
796 ctx.request_body_bytes += chunk_len as u64;
797
798 trace!(
799 correlation_id = %ctx.trace_id,
800 chunk_size = chunk_len,
801 total_body_bytes = ctx.request_body_bytes,
802 end_of_stream = end_of_stream,
803 streaming_mode = ?ctx.request_body_streaming_mode,
804 "Processing request body chunk"
805 );
806
807 let config = ctx
809 .config
810 .get_or_insert_with(|| self.config_manager.current());
811 if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
812 warn!(
813 correlation_id = %ctx.trace_id,
814 body_bytes = ctx.request_body_bytes,
815 limit = config.limits.max_body_size_bytes,
816 "Request body size limit exceeded"
817 );
818 self.metrics.record_blocked_request("body_size_exceeded");
819 return Err(Error::explain(
820 ErrorType::InternalError,
821 "Request body too large",
822 ));
823 }
824 }
825
826 if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
828 let config = ctx
829 .config
830 .get_or_insert_with(|| self.config_manager.current());
831 let max_inspection_bytes = config
832 .waf
833 .as_ref()
834 .map(|w| w.body_inspection.max_inspection_bytes as u64)
835 .unwrap_or(1024 * 1024);
836
837 match ctx.request_body_streaming_mode {
838 BodyStreamingMode::Stream => {
839 if body.is_some() {
841 self.process_body_chunk_streaming(body, end_of_stream, ctx)
842 .await?;
843 } else if end_of_stream && ctx.agent_needs_more {
844 self.process_body_chunk_streaming(body, end_of_stream, ctx)
846 .await?;
847 }
848 }
849 BodyStreamingMode::Hybrid { buffer_threshold } => {
850 if ctx.body_bytes_inspected < buffer_threshold as u64 {
852 if let Some(ref chunk) = body {
854 let bytes_to_buffer = std::cmp::min(
855 chunk.len(),
856 (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
857 );
858 ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
859 ctx.body_bytes_inspected += bytes_to_buffer as u64;
860
861 if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
863 {
864 self.send_buffered_body_to_agents(
866 end_of_stream && chunk.len() == bytes_to_buffer,
867 ctx,
868 )
869 .await?;
870 ctx.body_buffer.clear();
871
872 if bytes_to_buffer < chunk.len() {
874 let remaining = chunk.slice(bytes_to_buffer..);
875 let mut remaining_body = Some(remaining);
876 self.process_body_chunk_streaming(
877 &mut remaining_body,
878 end_of_stream,
879 ctx,
880 )
881 .await?;
882 }
883 }
884 }
885 } else {
886 self.process_body_chunk_streaming(body, end_of_stream, ctx)
888 .await?;
889 }
890 }
891 BodyStreamingMode::Buffer => {
892 if let Some(ref chunk) = body {
894 if ctx.body_bytes_inspected < max_inspection_bytes {
895 let bytes_to_inspect = std::cmp::min(
896 chunk.len() as u64,
897 max_inspection_bytes - ctx.body_bytes_inspected,
898 ) as usize;
899
900 ctx.body_buffer
901 .extend_from_slice(&chunk[..bytes_to_inspect]);
902 ctx.body_bytes_inspected += bytes_to_inspect as u64;
903
904 trace!(
905 correlation_id = %ctx.trace_id,
906 bytes_inspected = ctx.body_bytes_inspected,
907 max_inspection_bytes = max_inspection_bytes,
908 buffer_size = ctx.body_buffer.len(),
909 "Buffering body for agent inspection"
910 );
911 }
912 }
913
914 let should_send =
916 end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
917 if should_send && !ctx.body_buffer.is_empty() {
918 self.send_buffered_body_to_agents(end_of_stream, ctx)
919 .await?;
920 ctx.body_buffer.clear();
921 }
922 }
923 }
924 }
925
926 if end_of_stream {
927 trace!(
928 correlation_id = %ctx.trace_id,
929 total_body_bytes = ctx.request_body_bytes,
930 bytes_inspected = ctx.body_bytes_inspected,
931 "Request body complete"
932 );
933 }
934
935 Ok(())
936 }
937
938 async fn response_filter(
939 &self,
940 _session: &mut Session,
941 upstream_response: &mut ResponseHeader,
942 ctx: &mut Self::CTX,
943 ) -> Result<(), Box<Error>> {
944 let status = upstream_response.status.as_u16();
945 let duration = ctx.elapsed();
946
947 trace!(
948 correlation_id = %ctx.trace_id,
949 status = status,
950 "Starting response filter phase"
951 );
952
953 if status == 101 && ctx.is_websocket_upgrade {
955 if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
956 let inspector = crate::websocket::WebSocketInspector::with_metrics(
958 self.agent_manager.clone(),
959 ctx.route_id
960 .clone()
961 .unwrap_or_else(|| "unknown".to_string()),
962 ctx.trace_id.clone(),
963 ctx.client_ip.clone(),
964 100, Some(self.metrics.clone()),
966 );
967
968 let handler = crate::websocket::WebSocketHandler::new(
969 std::sync::Arc::new(inspector),
970 1024 * 1024, );
972
973 ctx.websocket_handler = Some(std::sync::Arc::new(handler));
974
975 info!(
976 correlation_id = %ctx.trace_id,
977 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
978 agent_count = ctx.websocket_inspection_agents.len(),
979 "WebSocket upgrade successful, frame inspection enabled"
980 );
981 } else if ctx.websocket_skip_inspection {
982 debug!(
983 correlation_id = %ctx.trace_id,
984 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
985 "WebSocket upgrade successful, inspection skipped (compression negotiated)"
986 );
987 } else {
988 debug!(
989 correlation_id = %ctx.trace_id,
990 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
991 "WebSocket upgrade successful"
992 );
993 }
994 }
995
996 trace!(
998 correlation_id = %ctx.trace_id,
999 "Applying security headers"
1000 );
1001 self.apply_security_headers(upstream_response).ok();
1002
1003 upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1005
1006 if status >= 400 {
1008 trace!(
1009 correlation_id = %ctx.trace_id,
1010 status = status,
1011 "Handling error response"
1012 );
1013 self.handle_error_response(upstream_response, ctx).await?;
1014 }
1015
1016 self.metrics.record_request(
1018 ctx.route_id.as_deref().unwrap_or("unknown"),
1019 &ctx.method,
1020 status,
1021 duration,
1022 );
1023
1024 if let Some(ref upstream) = ctx.upstream {
1026 let success = status < 500;
1027
1028 trace!(
1029 correlation_id = %ctx.trace_id,
1030 upstream = %upstream,
1031 success = success,
1032 status = status,
1033 "Recording passive health check result"
1034 );
1035
1036 self.passive_health.record_outcome(upstream, success).await;
1037
1038 if let Some(pool) = self.upstream_pools.get(upstream).await {
1040 pool.report_result(upstream, success).await;
1041 }
1042
1043 if !success {
1044 warn!(
1045 correlation_id = %ctx.trace_id,
1046 upstream = %upstream,
1047 status = status,
1048 "Upstream returned error status"
1049 );
1050 }
1051 }
1052
1053 if status >= 500 {
1055 error!(
1056 correlation_id = %ctx.trace_id,
1057 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1058 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1059 method = %ctx.method,
1060 path = %ctx.path,
1061 status = status,
1062 duration_ms = duration.as_millis(),
1063 attempts = ctx.upstream_attempts,
1064 "Request completed with server error"
1065 );
1066 } else if status >= 400 {
1067 warn!(
1068 correlation_id = %ctx.trace_id,
1069 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1070 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1071 method = %ctx.method,
1072 path = %ctx.path,
1073 status = status,
1074 duration_ms = duration.as_millis(),
1075 "Request completed with client error"
1076 );
1077 } else {
1078 debug!(
1079 correlation_id = %ctx.trace_id,
1080 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1081 upstream = ctx.upstream.as_deref().unwrap_or("none"),
1082 method = %ctx.method,
1083 path = %ctx.path,
1084 status = status,
1085 duration_ms = duration.as_millis(),
1086 attempts = ctx.upstream_attempts,
1087 "Request completed"
1088 );
1089 }
1090
1091 Ok(())
1092 }
1093
1094 async fn upstream_request_filter(
1097 &self,
1098 _session: &mut Session,
1099 upstream_request: &mut pingora::http::RequestHeader,
1100 ctx: &mut Self::CTX,
1101 ) -> Result<()>
1102 where
1103 Self::CTX: Send + Sync,
1104 {
1105 trace!(
1106 correlation_id = %ctx.trace_id,
1107 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1108 "Applying upstream request modifications"
1109 );
1110
1111 upstream_request
1113 .insert_header("X-Trace-Id", &ctx.trace_id)
1114 .ok();
1115
1116 upstream_request
1118 .insert_header("X-Forwarded-By", "Sentinel")
1119 .ok();
1120
1121 if let Some(ref route_config) = ctx.route_config {
1124 let mods = route_config.policies.request_headers.clone();
1125
1126 for (name, value) in mods.set {
1128 upstream_request.insert_header(name, value).ok();
1129 }
1130
1131 for (name, value) in mods.add {
1133 upstream_request.append_header(name, value).ok();
1134 }
1135
1136 for name in &mods.remove {
1138 upstream_request.remove_header(name);
1139 }
1140
1141 trace!(
1142 correlation_id = %ctx.trace_id,
1143 "Applied request header modifications"
1144 );
1145 }
1146
1147 upstream_request.remove_header("X-Internal-Token");
1149 upstream_request.remove_header("Authorization-Internal");
1150
1151 Ok(())
1152 }
1153
1154 fn response_body_filter(
1160 &self,
1161 _session: &mut Session,
1162 body: &mut Option<Bytes>,
1163 end_of_stream: bool,
1164 ctx: &mut Self::CTX,
1165 ) -> Result<Option<Duration>, Box<Error>> {
1166 if ctx.is_websocket_upgrade {
1169 if let Some(ref handler) = ctx.websocket_handler {
1170 let handler = handler.clone();
1171 let data = body.take();
1172
1173 let result = tokio::task::block_in_place(|| {
1176 tokio::runtime::Handle::current()
1177 .block_on(async { handler.process_server_data(data).await })
1178 });
1179
1180 match result {
1181 crate::websocket::ProcessResult::Forward(data) => {
1182 *body = data;
1183 }
1184 crate::websocket::ProcessResult::Close(reason) => {
1185 warn!(
1186 correlation_id = %ctx.trace_id,
1187 code = reason.code,
1188 reason = %reason.reason,
1189 "WebSocket connection closed by agent (server->client)"
1190 );
1191 let close_frame =
1194 crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
1195 let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
1196 if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
1197 *body = Some(Bytes::from(encoded));
1198 }
1199 }
1200 }
1201 }
1202 return Ok(None);
1204 }
1205
1206 if let Some(ref chunk) = body {
1208 ctx.response_bytes += chunk.len() as u64;
1209
1210 trace!(
1211 correlation_id = %ctx.trace_id,
1212 chunk_size = chunk.len(),
1213 total_response_bytes = ctx.response_bytes,
1214 end_of_stream = end_of_stream,
1215 "Processing response body chunk"
1216 );
1217
1218 if ctx.response_body_inspection_enabled
1222 && !ctx.response_body_inspection_agents.is_empty()
1223 {
1224 let config = ctx
1225 .config
1226 .get_or_insert_with(|| self.config_manager.current());
1227 let max_inspection_bytes = config
1228 .waf
1229 .as_ref()
1230 .map(|w| w.body_inspection.max_inspection_bytes as u64)
1231 .unwrap_or(1024 * 1024);
1232
1233 if ctx.response_body_bytes_inspected < max_inspection_bytes {
1234 let bytes_to_inspect = std::cmp::min(
1235 chunk.len() as u64,
1236 max_inspection_bytes - ctx.response_body_bytes_inspected,
1237 ) as usize;
1238
1239 ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
1243 ctx.response_body_chunk_index += 1;
1244
1245 trace!(
1246 correlation_id = %ctx.trace_id,
1247 bytes_inspected = ctx.response_body_bytes_inspected,
1248 max_inspection_bytes = max_inspection_bytes,
1249 chunk_index = ctx.response_body_chunk_index,
1250 "Tracking response body for inspection"
1251 );
1252 }
1253 }
1254 }
1255
1256 if end_of_stream {
1257 trace!(
1258 correlation_id = %ctx.trace_id,
1259 total_response_bytes = ctx.response_bytes,
1260 response_bytes_inspected = ctx.response_body_bytes_inspected,
1261 "Response body complete"
1262 );
1263 }
1264
1265 Ok(None)
1267 }
1268
1269 async fn connected_to_upstream(
1272 &self,
1273 _session: &mut Session,
1274 reused: bool,
1275 peer: &HttpPeer,
1276 #[cfg(unix)] _fd: RawFd,
1277 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
1278 digest: Option<&Digest>,
1279 ctx: &mut Self::CTX,
1280 ) -> Result<(), Box<Error>> {
1281 ctx.connection_reused = reused;
1283
1284 if reused {
1286 trace!(
1287 correlation_id = %ctx.trace_id,
1288 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1289 peer_address = %peer.address(),
1290 "Reusing existing upstream connection"
1291 );
1292 } else {
1293 debug!(
1294 correlation_id = %ctx.trace_id,
1295 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1296 peer_address = %peer.address(),
1297 ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
1298 "Established new upstream connection"
1299 );
1300 }
1301
1302 Ok(())
1303 }
1304
1305 fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
1315 let route_id = match ctx.route_id.as_deref() {
1317 Some(id) => id,
1318 None => {
1319 trace!(
1320 correlation_id = %ctx.trace_id,
1321 "Cache filter: no route ID, skipping cache"
1322 );
1323 return Ok(());
1324 }
1325 };
1326
1327 if !self.cache_manager.is_enabled(route_id) {
1329 trace!(
1330 correlation_id = %ctx.trace_id,
1331 route_id = %route_id,
1332 "Cache disabled for route"
1333 );
1334 return Ok(());
1335 }
1336
1337 if !self
1339 .cache_manager
1340 .is_method_cacheable(route_id, &ctx.method)
1341 {
1342 trace!(
1343 correlation_id = %ctx.trace_id,
1344 route_id = %route_id,
1345 method = %ctx.method,
1346 "Method not cacheable"
1347 );
1348 return Ok(());
1349 }
1350
1351 debug!(
1353 correlation_id = %ctx.trace_id,
1354 route_id = %route_id,
1355 method = %ctx.method,
1356 path = %ctx.path,
1357 "Enabling HTTP caching for request"
1358 );
1359
1360 let storage = get_cache_storage();
1362 let eviction = get_cache_eviction();
1363 let cache_lock = get_cache_lock();
1364
1365 session.cache.enable(
1367 storage,
1368 Some(eviction),
1369 None, Some(cache_lock),
1371 None, );
1373
1374 ctx.cache_eligible = true;
1376
1377 trace!(
1378 correlation_id = %ctx.trace_id,
1379 route_id = %route_id,
1380 cache_enabled = session.cache.enabled(),
1381 "Cache enabled for request"
1382 );
1383
1384 Ok(())
1385 }
1386
1387 fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
1392 let req_header = session.req_header();
1393 let method = req_header.method.as_str();
1394 let path = req_header.uri.path();
1395 let host = ctx.host.as_deref().unwrap_or("unknown");
1396 let query = req_header.uri.query();
1397
1398 let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1400
1401 trace!(
1402 correlation_id = %ctx.trace_id,
1403 cache_key = %key_string,
1404 "Generated cache key"
1405 );
1406
1407 Ok(CacheKey::default(req_header))
1410 }
1411
1412 fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
1417 session.cache.cache_miss();
1419
1420 if let Some(route_id) = ctx.route_id.as_deref() {
1422 self.cache_manager.stats().record_miss();
1423
1424 trace!(
1425 correlation_id = %ctx.trace_id,
1426 route_id = %route_id,
1427 path = %ctx.path,
1428 "Cache miss"
1429 );
1430 }
1431 }
1432
1433 async fn cache_hit_filter(
1439 &self,
1440 session: &mut Session,
1441 meta: &CacheMeta,
1442 _hit_handler: &mut HitHandler,
1443 is_fresh: bool,
1444 ctx: &mut Self::CTX,
1445 ) -> Result<Option<ForcedInvalidationKind>>
1446 where
1447 Self::CTX: Send + Sync,
1448 {
1449 let req_header = session.req_header();
1451 let method = req_header.method.as_str();
1452 let path = req_header.uri.path();
1453 let host = req_header.uri.host().unwrap_or("localhost");
1454 let query = req_header.uri.query();
1455
1456 let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1458
1459 if self.cache_manager.should_invalidate(&cache_key) {
1461 info!(
1462 correlation_id = %ctx.trace_id,
1463 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1464 cache_key = %cache_key,
1465 "Cache entry invalidated by purge request"
1466 );
1467 return Ok(Some(ForcedInvalidationKind::ForceExpired));
1469 }
1470
1471 if is_fresh {
1473 self.cache_manager.stats().record_hit();
1474
1475 debug!(
1476 correlation_id = %ctx.trace_id,
1477 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1478 is_fresh = is_fresh,
1479 "Cache hit (fresh)"
1480 );
1481 } else {
1482 trace!(
1483 correlation_id = %ctx.trace_id,
1484 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1485 is_fresh = is_fresh,
1486 "Cache hit (stale)"
1487 );
1488 }
1489
1490 Ok(None)
1492 }
1493
1494 fn response_cache_filter(
1499 &self,
1500 _session: &Session,
1501 resp: &ResponseHeader,
1502 ctx: &mut Self::CTX,
1503 ) -> Result<RespCacheable> {
1504 let route_id = match ctx.route_id.as_deref() {
1505 Some(id) => id,
1506 None => {
1507 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1508 "no_route",
1509 )));
1510 }
1511 };
1512
1513 if !self.cache_manager.is_enabled(route_id) {
1515 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1516 "disabled",
1517 )));
1518 }
1519
1520 let status = resp.status.as_u16();
1521
1522 if !self.cache_manager.is_status_cacheable(route_id, status) {
1524 trace!(
1525 correlation_id = %ctx.trace_id,
1526 route_id = %route_id,
1527 status = status,
1528 "Status code not cacheable"
1529 );
1530 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1531 }
1532
1533 if let Some(cache_control) = resp.headers.get("cache-control") {
1535 if let Ok(cc_str) = cache_control.to_str() {
1536 if crate::cache::CacheManager::is_no_cache(cc_str) {
1537 trace!(
1538 correlation_id = %ctx.trace_id,
1539 route_id = %route_id,
1540 cache_control = %cc_str,
1541 "Response has no-cache directive"
1542 );
1543 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1544 }
1545 }
1546 }
1547
1548 let cache_control = resp
1550 .headers
1551 .get("cache-control")
1552 .and_then(|v| v.to_str().ok());
1553 let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
1554
1555 if ttl.is_zero() {
1556 trace!(
1557 correlation_id = %ctx.trace_id,
1558 route_id = %route_id,
1559 "TTL is zero, not caching"
1560 );
1561 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1562 }
1563
1564 let config = self
1566 .cache_manager
1567 .get_route_config(route_id)
1568 .unwrap_or_default();
1569
1570 let now = std::time::SystemTime::now();
1572 let fresh_until = now + ttl;
1573
1574 let header = resp.clone();
1576
1577 let cache_meta = CacheMeta::new(
1579 fresh_until,
1580 now,
1581 config.stale_while_revalidate_secs as u32,
1582 config.stale_if_error_secs as u32,
1583 header,
1584 );
1585
1586 self.cache_manager.stats().record_store();
1588
1589 debug!(
1590 correlation_id = %ctx.trace_id,
1591 route_id = %route_id,
1592 status = status,
1593 ttl_secs = ttl.as_secs(),
1594 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1595 stale_if_error_secs = config.stale_if_error_secs,
1596 "Caching response"
1597 );
1598
1599 Ok(RespCacheable::Cacheable(cache_meta))
1600 }
1601
1602 fn should_serve_stale(
1606 &self,
1607 _session: &mut Session,
1608 ctx: &mut Self::CTX,
1609 error: Option<&Error>,
1610 ) -> bool {
1611 let route_id = match ctx.route_id.as_deref() {
1612 Some(id) => id,
1613 None => return false,
1614 };
1615
1616 let config = match self.cache_manager.get_route_config(route_id) {
1618 Some(c) => c,
1619 None => return false,
1620 };
1621
1622 if let Some(e) = error {
1624 if e.esource() == &pingora::ErrorSource::Upstream {
1626 debug!(
1627 correlation_id = %ctx.trace_id,
1628 route_id = %route_id,
1629 error = %e,
1630 stale_if_error_secs = config.stale_if_error_secs,
1631 "Considering stale-if-error"
1632 );
1633 return config.stale_if_error_secs > 0;
1634 }
1635 }
1636
1637 if error.is_none() && config.stale_while_revalidate_secs > 0 {
1639 trace!(
1640 correlation_id = %ctx.trace_id,
1641 route_id = %route_id,
1642 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1643 "Allowing stale-while-revalidate"
1644 );
1645 return true;
1646 }
1647
1648 false
1649 }
1650
1651 fn range_header_filter(
1661 &self,
1662 session: &mut Session,
1663 response: &mut ResponseHeader,
1664 ctx: &mut Self::CTX,
1665 ) -> pingora_proxy::RangeType
1666 where
1667 Self::CTX: Send + Sync,
1668 {
1669 let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
1671 matches!(
1673 config.service_type,
1674 sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
1675 )
1676 });
1677
1678 if !supports_range {
1679 trace!(
1680 correlation_id = %ctx.trace_id,
1681 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1682 "Range request not supported for this route type"
1683 );
1684 return pingora_proxy::RangeType::None;
1685 }
1686
1687 let range_type = pingora_proxy::range_header_filter(session.req_header(), response);
1689
1690 match &range_type {
1691 pingora_proxy::RangeType::None => {
1692 trace!(
1693 correlation_id = %ctx.trace_id,
1694 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1695 "No range request or not applicable"
1696 );
1697 }
1698 pingora_proxy::RangeType::Single(range) => {
1699 trace!(
1700 correlation_id = %ctx.trace_id,
1701 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1702 range_start = range.start,
1703 range_end = range.end,
1704 "Processing single-range request"
1705 );
1706 }
1707 pingora_proxy::RangeType::Multi(multi) => {
1708 trace!(
1709 correlation_id = %ctx.trace_id,
1710 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1711 range_count = multi.ranges.len(),
1712 "Processing multi-range request"
1713 );
1714 }
1715 pingora_proxy::RangeType::Invalid => {
1716 debug!(
1717 correlation_id = %ctx.trace_id,
1718 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1719 "Invalid range header"
1720 );
1721 }
1722 }
1723
1724 range_type
1725 }
1726
1727 async fn fail_to_proxy(
1730 &self,
1731 _session: &mut Session,
1732 e: &Error,
1733 ctx: &mut Self::CTX,
1734 ) -> pingora_proxy::FailToProxy
1735 where
1736 Self::CTX: Send + Sync,
1737 {
1738 let error_code = match e.etype() {
1739 ErrorType::ConnectRefused => 503,
1741 ErrorType::ConnectTimedout => 504,
1742 ErrorType::ConnectNoRoute => 502,
1743
1744 ErrorType::ReadTimedout => 504,
1746 ErrorType::WriteTimedout => 504,
1747
1748 ErrorType::TLSHandshakeFailure => 502,
1750 ErrorType::InvalidCert => 502,
1751
1752 ErrorType::InvalidHTTPHeader => 400,
1754 ErrorType::H2Error => 502,
1755
1756 ErrorType::ConnectProxyFailure => 502,
1758 ErrorType::ConnectionClosed => 502,
1759
1760 ErrorType::InternalError => 500,
1762
1763 _ => 502,
1765 };
1766
1767 error!(
1768 correlation_id = %ctx.trace_id,
1769 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1770 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1771 error_type = ?e.etype(),
1772 error = %e,
1773 error_code = error_code,
1774 "Proxy error occurred"
1775 );
1776
1777 self.metrics
1779 .record_blocked_request(&format!("proxy_error_{}", error_code));
1780
1781 pingora_proxy::FailToProxy {
1784 error_code,
1785 can_reuse_downstream: error_code < 500,
1786 }
1787 }
1788
1789 fn error_while_proxy(
1795 &self,
1796 peer: &HttpPeer,
1797 session: &mut Session,
1798 e: Box<Error>,
1799 ctx: &mut Self::CTX,
1800 client_reused: bool,
1801 ) -> Box<Error> {
1802 let error_type = e.etype().clone();
1803 let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
1804
1805 let is_retryable = matches!(
1807 error_type,
1808 ErrorType::ConnectTimedout
1809 | ErrorType::ReadTimedout
1810 | ErrorType::WriteTimedout
1811 | ErrorType::ConnectionClosed
1812 | ErrorType::ConnectRefused
1813 );
1814
1815 warn!(
1817 correlation_id = %ctx.trace_id,
1818 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1819 upstream = %upstream_id,
1820 peer_address = %peer.address(),
1821 error_type = ?error_type,
1822 error = %e,
1823 client_reused = client_reused,
1824 is_retryable = is_retryable,
1825 "Error during proxy operation"
1826 );
1827
1828 let peer_address = peer.address().to_string();
1831 let upstream_pools = self.upstream_pools.clone();
1832 let upstream_id_owned = upstream_id.to_string();
1833 tokio::spawn(async move {
1834 if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
1835 pool.report_result(&peer_address, false).await;
1836 }
1837 });
1838
1839 self.metrics
1841 .record_blocked_request(&format!("proxy_error_{:?}", error_type));
1842
1843 let mut enhanced_error = e.more_context(format!(
1845 "Upstream: {}, Peer: {}, Attempts: {}",
1846 upstream_id,
1847 peer.address(),
1848 ctx.upstream_attempts
1849 ));
1850
1851 if is_retryable {
1856 let can_retry = if client_reused {
1857 !session.as_ref().retry_buffer_truncated()
1859 } else {
1860 true
1862 };
1863
1864 enhanced_error.retry.decide_reuse(can_retry);
1865
1866 if can_retry {
1867 debug!(
1868 correlation_id = %ctx.trace_id,
1869 upstream = %upstream_id,
1870 error_type = ?error_type,
1871 "Error is retryable, will attempt retry"
1872 );
1873 }
1874 } else {
1875 enhanced_error.retry.decide_reuse(false);
1877 }
1878
1879 enhanced_error
1880 }
1881
1882 async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
1883 self.reload_coordinator.dec_requests();
1885
1886 let duration = ctx.elapsed();
1887
1888 let status = session
1890 .response_written()
1891 .map(|r| r.status.as_u16())
1892 .unwrap_or(0);
1893
1894 if self.log_manager.access_log_enabled() {
1896 let access_entry = AccessLogEntry {
1897 timestamp: chrono::Utc::now().to_rfc3339(),
1898 trace_id: ctx.trace_id.clone(),
1899 method: ctx.method.clone(),
1900 path: ctx.path.clone(),
1901 query: ctx.query.clone(),
1902 protocol: "HTTP/1.1".to_string(),
1903 status,
1904 body_bytes: ctx.response_bytes,
1905 duration_ms: duration.as_millis() as u64,
1906 client_ip: ctx.client_ip.clone(),
1907 user_agent: ctx.user_agent.clone(),
1908 referer: ctx.referer.clone(),
1909 host: ctx.host.clone(),
1910 route_id: ctx.route_id.clone(),
1911 upstream: ctx.upstream.clone(),
1912 upstream_attempts: ctx.upstream_attempts,
1913 instance_id: self.app_state.instance_id.clone(),
1914 };
1915 self.log_manager.log_access(&access_entry);
1916 }
1917
1918 if tracing::enabled!(tracing::Level::DEBUG) {
1920 debug!(
1921 trace_id = %ctx.trace_id,
1922 method = %ctx.method,
1923 path = %ctx.path,
1924 route_id = ?ctx.route_id,
1925 upstream = ?ctx.upstream,
1926 status = status,
1927 duration_ms = duration.as_millis() as u64,
1928 upstream_attempts = ctx.upstream_attempts,
1929 error = ?_error.map(|e| e.to_string()),
1930 "Request completed"
1931 );
1932 }
1933
1934 if ctx.is_websocket_upgrade && status == 101 {
1936 info!(
1937 trace_id = %ctx.trace_id,
1938 route_id = ?ctx.route_id,
1939 upstream = ?ctx.upstream,
1940 client_ip = %ctx.client_ip,
1941 "WebSocket connection established"
1942 );
1943 }
1944 }
1945}
1946
1947impl SentinelProxy {
1952 async fn process_body_chunk_streaming(
1954 &self,
1955 body: &mut Option<Bytes>,
1956 end_of_stream: bool,
1957 ctx: &mut RequestContext,
1958 ) -> Result<(), Box<Error>> {
1959 let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
1961 let chunk_index = ctx.request_body_chunk_index;
1962 ctx.request_body_chunk_index += 1;
1963 ctx.body_bytes_inspected += chunk_data.len() as u64;
1964
1965 debug!(
1966 correlation_id = %ctx.trace_id,
1967 chunk_index = chunk_index,
1968 chunk_size = chunk_data.len(),
1969 end_of_stream = end_of_stream,
1970 "Streaming body chunk to agents"
1971 );
1972
1973 let agent_ctx = crate::agents::AgentCallContext {
1975 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
1976 metadata: sentinel_agent_protocol::RequestMetadata {
1977 correlation_id: ctx.trace_id.clone(),
1978 request_id: ctx.trace_id.clone(),
1979 client_ip: ctx.client_ip.clone(),
1980 client_port: 0,
1981 server_name: ctx.host.clone(),
1982 protocol: "HTTP/1.1".to_string(),
1983 tls_version: None,
1984 tls_cipher: None,
1985 route_id: ctx.route_id.clone(),
1986 upstream_id: ctx.upstream.clone(),
1987 timestamp: chrono::Utc::now().to_rfc3339(),
1988 },
1989 route_id: ctx.route_id.clone(),
1990 upstream_id: ctx.upstream.clone(),
1991 request_body: None, response_body: None,
1993 };
1994
1995 let agent_ids = ctx.body_inspection_agents.clone();
1996 let total_size = None; match self
1999 .agent_manager
2000 .process_request_body_streaming(
2001 &agent_ctx,
2002 &chunk_data,
2003 end_of_stream,
2004 chunk_index,
2005 ctx.body_bytes_inspected as usize,
2006 total_size,
2007 &agent_ids,
2008 )
2009 .await
2010 {
2011 Ok(decision) => {
2012 ctx.agent_needs_more = decision.needs_more;
2014
2015 if let Some(ref mutation) = decision.request_body_mutation {
2017 if !mutation.is_pass_through() {
2018 if mutation.is_drop() {
2019 *body = None;
2021 trace!(
2022 correlation_id = %ctx.trace_id,
2023 chunk_index = chunk_index,
2024 "Agent dropped body chunk"
2025 );
2026 } else if let Some(ref new_data) = mutation.data {
2027 *body = Some(Bytes::from(new_data.clone()));
2029 trace!(
2030 correlation_id = %ctx.trace_id,
2031 chunk_index = chunk_index,
2032 original_size = chunk_data.len(),
2033 new_size = new_data.len(),
2034 "Agent mutated body chunk"
2035 );
2036 }
2037 }
2038 }
2039
2040 if !decision.needs_more && !decision.is_allow() {
2042 warn!(
2043 correlation_id = %ctx.trace_id,
2044 action = ?decision.action,
2045 "Agent blocked request body"
2046 );
2047 self.metrics.record_blocked_request("agent_body_inspection");
2048
2049 let (status, message) = match &decision.action {
2050 crate::agents::AgentAction::Block { status, body, .. } => (
2051 *status,
2052 body.clone().unwrap_or_else(|| "Blocked".to_string()),
2053 ),
2054 _ => (403, "Forbidden".to_string()),
2055 };
2056
2057 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2058 }
2059
2060 trace!(
2061 correlation_id = %ctx.trace_id,
2062 needs_more = decision.needs_more,
2063 "Agent processed body chunk"
2064 );
2065 }
2066 Err(e) => {
2067 let fail_closed = ctx
2068 .route_config
2069 .as_ref()
2070 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2071 .unwrap_or(false);
2072
2073 if fail_closed {
2074 error!(
2075 correlation_id = %ctx.trace_id,
2076 error = %e,
2077 "Agent streaming body inspection failed, blocking (fail-closed)"
2078 );
2079 return Err(Error::explain(
2080 ErrorType::HTTPStatus(503),
2081 "Service unavailable",
2082 ));
2083 } else {
2084 warn!(
2085 correlation_id = %ctx.trace_id,
2086 error = %e,
2087 "Agent streaming body inspection failed, allowing (fail-open)"
2088 );
2089 }
2090 }
2091 }
2092
2093 Ok(())
2094 }
2095
2096 async fn send_buffered_body_to_agents(
2098 &self,
2099 end_of_stream: bool,
2100 ctx: &mut RequestContext,
2101 ) -> Result<(), Box<Error>> {
2102 debug!(
2103 correlation_id = %ctx.trace_id,
2104 buffer_size = ctx.body_buffer.len(),
2105 end_of_stream = end_of_stream,
2106 agent_count = ctx.body_inspection_agents.len(),
2107 "Sending buffered body to agents for inspection"
2108 );
2109
2110 let agent_ctx = crate::agents::AgentCallContext {
2111 correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
2112 metadata: sentinel_agent_protocol::RequestMetadata {
2113 correlation_id: ctx.trace_id.clone(),
2114 request_id: ctx.trace_id.clone(),
2115 client_ip: ctx.client_ip.clone(),
2116 client_port: 0,
2117 server_name: ctx.host.clone(),
2118 protocol: "HTTP/1.1".to_string(),
2119 tls_version: None,
2120 tls_cipher: None,
2121 route_id: ctx.route_id.clone(),
2122 upstream_id: ctx.upstream.clone(),
2123 timestamp: chrono::Utc::now().to_rfc3339(),
2124 },
2125 route_id: ctx.route_id.clone(),
2126 upstream_id: ctx.upstream.clone(),
2127 request_body: Some(ctx.body_buffer.clone()),
2128 response_body: None,
2129 };
2130
2131 let agent_ids = ctx.body_inspection_agents.clone();
2132 match self
2133 .agent_manager
2134 .process_request_body(&agent_ctx, &ctx.body_buffer, end_of_stream, &agent_ids)
2135 .await
2136 {
2137 Ok(decision) => {
2138 if !decision.is_allow() {
2139 warn!(
2140 correlation_id = %ctx.trace_id,
2141 action = ?decision.action,
2142 "Agent blocked request body"
2143 );
2144 self.metrics.record_blocked_request("agent_body_inspection");
2145
2146 let (status, message) = match &decision.action {
2147 crate::agents::AgentAction::Block { status, body, .. } => (
2148 *status,
2149 body.clone().unwrap_or_else(|| "Blocked".to_string()),
2150 ),
2151 _ => (403, "Forbidden".to_string()),
2152 };
2153
2154 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2155 }
2156
2157 trace!(
2158 correlation_id = %ctx.trace_id,
2159 "Agent allowed request body"
2160 );
2161 }
2162 Err(e) => {
2163 let fail_closed = ctx
2164 .route_config
2165 .as_ref()
2166 .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2167 .unwrap_or(false);
2168
2169 if fail_closed {
2170 error!(
2171 correlation_id = %ctx.trace_id,
2172 error = %e,
2173 "Agent body inspection failed, blocking (fail-closed)"
2174 );
2175 return Err(Error::explain(
2176 ErrorType::HTTPStatus(503),
2177 "Service unavailable",
2178 ));
2179 } else {
2180 warn!(
2181 correlation_id = %ctx.trace_id,
2182 error = %e,
2183 "Agent body inspection failed, allowing (fail-open)"
2184 );
2185 }
2186 }
2187 }
2188
2189 Ok(())
2190 }
2191}