sentinel_proxy/proxy/
http_trait.rs

1//! ProxyHttp trait implementation for SentinelProxy.
2//!
3//! This module contains the Pingora ProxyHttp trait implementation which defines
4//! the core request/response lifecycle handling.
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14    CacheKey, CacheMeta, ForcedInvalidationKind, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
23use crate::rate_limit::HeaderAccessor;
24use crate::routing::RequestInfo;
25
26use super::context::RequestContext;
27use super::SentinelProxy;
28
29/// Helper type for rate limiting when we don't need header access
30struct NoHeaderAccessor;
31impl HeaderAccessor for NoHeaderAccessor {
32    fn get_header(&self, _name: &str) -> Option<String> {
33        None
34    }
35}
36
37#[async_trait]
38impl ProxyHttp for SentinelProxy {
39    type CTX = RequestContext;
40
41    fn new_ctx(&self) -> Self::CTX {
42        RequestContext::new()
43    }
44
45    fn fail_to_connect(
46        &self,
47        _session: &mut Session,
48        peer: &HttpPeer,
49        ctx: &mut Self::CTX,
50        e: Box<Error>,
51    ) -> Box<Error> {
52        error!(
53            correlation_id = %ctx.trace_id,
54            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
55            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
56            peer_address = %peer.address(),
57            error = %e,
58            "Failed to connect to upstream peer"
59        );
60        // Custom error pages are handled in response_filter
61        e
62    }
63
64    /// Early request filter - runs before upstream selection
65    /// Used to handle builtin routes that don't need an upstream connection
66    async fn early_request_filter(
67        &self,
68        session: &mut Session,
69        ctx: &mut Self::CTX,
70    ) -> Result<(), Box<Error>> {
71        // Extract request info for routing
72        let req_header = session.req_header();
73        let method = req_header.method.as_str();
74        let path = req_header.uri.path();
75        let host = req_header
76            .headers
77            .get("host")
78            .and_then(|h| h.to_str().ok())
79            .unwrap_or("");
80
81        ctx.method = method.to_string();
82        ctx.path = path.to_string();
83        ctx.host = Some(host.to_string());
84
85        // Match route to determine service type
86        let route_match = {
87            let route_matcher = self.route_matcher.read();
88            let request_info = RequestInfo::new(method, path, host);
89            match route_matcher.match_request(&request_info) {
90                Some(m) => m,
91                None => return Ok(()), // No matching route, let upstream_peer handle it
92            }
93        };
94
95        ctx.trace_id = self.get_trace_id(session);
96        ctx.route_id = Some(route_match.route_id.to_string());
97        ctx.route_config = Some(route_match.config.clone());
98
99        // Parse incoming W3C trace context if present
100        if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
101            if let Ok(s) = traceparent.to_str() {
102                ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
103            }
104        }
105
106        // Start OpenTelemetry request span if tracing is enabled
107        if let Some(tracer) = crate::otel::get_tracer() {
108            ctx.otel_span = Some(tracer.start_span(method, path, ctx.trace_context.as_ref()));
109        }
110
111        // Check if this is a builtin handler route
112        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
113            trace!(
114                correlation_id = %ctx.trace_id,
115                route_id = %route_match.route_id,
116                builtin_handler = ?route_match.config.builtin_handler,
117                "Handling builtin route in early_request_filter"
118            );
119
120            // Handle the builtin route directly
121            let handled = self
122                .handle_builtin_route(session, ctx, &route_match)
123                .await?;
124
125            if handled {
126                // Return error to signal that request is complete (Pingora will not continue)
127                return Err(Error::explain(
128                    ErrorType::InternalError,
129                    "Builtin handler complete",
130                ));
131            }
132        }
133
134        Ok(())
135    }
136
137    async fn upstream_peer(
138        &self,
139        session: &mut Session,
140        ctx: &mut Self::CTX,
141    ) -> Result<Box<HttpPeer>, Box<Error>> {
142        // Track active request
143        self.reload_coordinator.inc_requests();
144
145        // Cache global config once per request (avoids repeated Arc clones)
146        if ctx.config.is_none() {
147            ctx.config = Some(self.config_manager.current());
148        }
149
150        // Cache client address for logging if not already set
151        if ctx.client_ip.is_empty() {
152            ctx.client_ip = session
153                .client_addr()
154                .map(|a| a.to_string())
155                .unwrap_or_else(|| "unknown".to_string());
156        }
157
158        let req_header = session.req_header();
159
160        // Cache request info for access logging if not already set
161        if ctx.method.is_empty() {
162            ctx.method = req_header.method.to_string();
163            ctx.path = req_header.uri.path().to_string();
164            ctx.query = req_header.uri.query().map(|q| q.to_string());
165            ctx.host = req_header
166                .headers
167                .get("host")
168                .and_then(|v| v.to_str().ok())
169                .map(|s| s.to_string());
170        }
171        ctx.user_agent = req_header
172            .headers
173            .get("user-agent")
174            .and_then(|v| v.to_str().ok())
175            .map(|s| s.to_string());
176        ctx.referer = req_header
177            .headers
178            .get("referer")
179            .and_then(|v| v.to_str().ok())
180            .map(|s| s.to_string());
181
182        trace!(
183            correlation_id = %ctx.trace_id,
184            client_ip = %ctx.client_ip,
185            "Request received, initializing context"
186        );
187
188        // Use cached route info if already set by early_request_filter
189        let route_match = if let Some(ref route_config) = ctx.route_config {
190            let route_id = ctx.route_id.as_deref().unwrap_or("");
191            crate::routing::RouteMatch {
192                route_id: sentinel_common::RouteId::new(route_id),
193                config: route_config.clone(),
194            }
195        } else {
196            // Match route using sync RwLock (scoped to ensure lock is released before async ops)
197            let (match_result, route_duration) = {
198                let route_matcher = self.route_matcher.read();
199                let host = ctx.host.as_deref().unwrap_or("");
200
201                // Build request info (zero-copy for common case)
202                let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
203
204                // Only build headers HashMap if any route needs header matching
205                if route_matcher.needs_headers() {
206                    request_info = request_info
207                        .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
208                }
209
210                // Only parse query params if any route needs query param matching
211                if route_matcher.needs_query_params() {
212                    request_info =
213                        request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
214                }
215
216                trace!(
217                    correlation_id = %ctx.trace_id,
218                    method = %request_info.method,
219                    path = %request_info.path,
220                    host = %request_info.host,
221                    "Built request info for route matching"
222                );
223
224                let route_start = std::time::Instant::now();
225                let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
226                    warn!(
227                        correlation_id = %ctx.trace_id,
228                        method = %request_info.method,
229                        path = %request_info.path,
230                        host = %request_info.host,
231                        "No matching route found for request"
232                    );
233                    Error::explain(ErrorType::InternalError, "No matching route found")
234                })?;
235                let route_duration = route_start.elapsed();
236                // Lock is dropped here when block ends
237                (route_match, route_duration)
238            };
239
240            ctx.route_id = Some(match_result.route_id.to_string());
241            ctx.route_config = Some(match_result.config.clone());
242
243            // Set trace_id if not already set by early_request_filter
244            if ctx.trace_id.is_empty() {
245                ctx.trace_id = self.get_trace_id(session);
246
247                // Parse incoming W3C trace context if present
248                if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER)
249                {
250                    if let Ok(s) = traceparent.to_str() {
251                        ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
252                    }
253                }
254
255                // Start OpenTelemetry request span if tracing is enabled
256                if let Some(tracer) = crate::otel::get_tracer() {
257                    ctx.otel_span = Some(tracer.start_span(
258                        &ctx.method,
259                        &ctx.path,
260                        ctx.trace_context.as_ref(),
261                    ));
262                }
263            }
264
265            trace!(
266                correlation_id = %ctx.trace_id,
267                route_id = %match_result.route_id,
268                route_duration_us = route_duration.as_micros(),
269                service_type = ?match_result.config.service_type,
270                "Route matched"
271            );
272            match_result
273        };
274
275        // Check if this is a builtin handler route (no upstream needed)
276        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
277            trace!(
278                correlation_id = %ctx.trace_id,
279                route_id = %route_match.route_id,
280                builtin_handler = ?route_match.config.builtin_handler,
281                "Route type is builtin, skipping upstream"
282            );
283            // Mark as builtin route for later processing in request_filter
284            ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
285            // Return error to skip upstream connection for builtin routes
286            return Err(Error::explain(
287                ErrorType::InternalError,
288                "Builtin handler handled in request_filter",
289            ));
290        }
291
292        // Check if this is a static file route
293        if route_match.config.service_type == sentinel_config::ServiceType::Static {
294            trace!(
295                correlation_id = %ctx.trace_id,
296                route_id = %route_match.route_id,
297                "Route type is static, checking for static server"
298            );
299            // Static routes don't need an upstream
300            if self
301                .static_servers
302                .get(route_match.route_id.as_str())
303                .await
304                .is_some()
305            {
306                // Mark this as a static route for later processing
307                ctx.upstream = Some(format!("_static_{}", route_match.route_id));
308                info!(
309                    correlation_id = %ctx.trace_id,
310                    route_id = %route_match.route_id,
311                    path = %ctx.path,
312                    "Serving static file"
313                );
314                // Return error to avoid upstream connection for static routes
315                return Err(Error::explain(
316                    ErrorType::InternalError,
317                    "Static file serving handled in request_filter",
318                ));
319            }
320        }
321
322        // Regular route with upstream
323        if let Some(ref upstream) = route_match.config.upstream {
324            ctx.upstream = Some(upstream.clone());
325            trace!(
326                correlation_id = %ctx.trace_id,
327                route_id = %route_match.route_id,
328                upstream = %upstream,
329                "Upstream configured for route"
330            );
331        } else {
332            error!(
333                correlation_id = %ctx.trace_id,
334                route_id = %route_match.route_id,
335                "Route has no upstream configured"
336            );
337            return Err(Error::explain(
338                ErrorType::InternalError,
339                format!(
340                    "Route '{}' has no upstream configured",
341                    route_match.route_id
342                ),
343            ));
344        }
345
346        debug!(
347            correlation_id = %ctx.trace_id,
348            route_id = %route_match.route_id,
349            upstream = ?ctx.upstream,
350            method = %req_header.method,
351            path = %req_header.uri.path(),
352            host = ctx.host.as_deref().unwrap_or("-"),
353            client_ip = %ctx.client_ip,
354            "Processing request"
355        );
356
357        // Get upstream pool (skip for static routes)
358        if ctx
359            .upstream
360            .as_ref()
361            .is_some_and(|u| u.starts_with("_static_"))
362        {
363            // Static routes are handled in request_filter, should not reach here
364            return Err(Error::explain(
365                ErrorType::InternalError,
366                "Static route should be handled in request_filter",
367            ));
368        }
369
370        let upstream_name = ctx
371            .upstream
372            .as_ref()
373            .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
374
375        trace!(
376            correlation_id = %ctx.trace_id,
377            upstream = %upstream_name,
378            "Looking up upstream pool"
379        );
380
381        let pool = self
382            .upstream_pools
383            .get(upstream_name)
384            .await
385            .ok_or_else(|| {
386                error!(
387                    correlation_id = %ctx.trace_id,
388                    upstream = %upstream_name,
389                    "Upstream pool not found"
390                );
391                Error::explain(
392                    ErrorType::InternalError,
393                    format!("Upstream pool '{}' not found", upstream_name),
394                )
395            })?;
396
397        // Select peer from pool with retries
398        let max_retries = route_match
399            .config
400            .retry_policy
401            .as_ref()
402            .map(|r| r.max_attempts)
403            .unwrap_or(1);
404
405        trace!(
406            correlation_id = %ctx.trace_id,
407            upstream = %upstream_name,
408            max_retries = max_retries,
409            "Starting upstream peer selection"
410        );
411
412        let mut last_error = None;
413        let selection_start = std::time::Instant::now();
414
415        for attempt in 1..=max_retries {
416            ctx.upstream_attempts = attempt;
417
418            trace!(
419                correlation_id = %ctx.trace_id,
420                upstream = %upstream_name,
421                attempt = attempt,
422                max_retries = max_retries,
423                "Attempting to select upstream peer"
424            );
425
426            match pool.select_peer(None).await {
427                Ok(peer) => {
428                    let selection_duration = selection_start.elapsed();
429                    // Store selected peer address for feedback reporting in logging()
430                    let peer_addr = peer.address().to_string();
431                    ctx.selected_upstream_address = Some(peer_addr.clone());
432                    debug!(
433                        correlation_id = %ctx.trace_id,
434                        upstream = %upstream_name,
435                        peer_address = %peer_addr,
436                        attempt = attempt,
437                        selection_duration_us = selection_duration.as_micros(),
438                        "Selected upstream peer"
439                    );
440                    return Ok(Box::new(peer));
441                }
442                Err(e) => {
443                    warn!(
444                        correlation_id = %ctx.trace_id,
445                        upstream = %upstream_name,
446                        attempt = attempt,
447                        max_retries = max_retries,
448                        error = %e,
449                        "Failed to select upstream peer"
450                    );
451                    last_error = Some(e);
452
453                    if attempt < max_retries {
454                        // Exponential backoff (using pingora-timeout for efficiency)
455                        let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
456                        trace!(
457                            correlation_id = %ctx.trace_id,
458                            backoff_ms = backoff.as_millis(),
459                            "Backing off before retry"
460                        );
461                        sleep(backoff).await;
462                    }
463                }
464            }
465        }
466
467        let selection_duration = selection_start.elapsed();
468        error!(
469            correlation_id = %ctx.trace_id,
470            upstream = %upstream_name,
471            attempts = max_retries,
472            selection_duration_ms = selection_duration.as_millis(),
473            last_error = ?last_error,
474            "All upstream selection attempts failed"
475        );
476
477        Err(Error::explain(
478            ErrorType::InternalError,
479            format!("All upstream attempts failed: {:?}", last_error),
480        ))
481    }
482
483    async fn request_filter(
484        &self,
485        session: &mut Session,
486        ctx: &mut Self::CTX,
487    ) -> Result<bool, Box<Error>> {
488        trace!(
489            correlation_id = %ctx.trace_id,
490            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
491            "Starting request filter phase"
492        );
493
494        // Check rate limiting early (before other processing)
495        // Fast path: skip if no rate limiting is configured for this route
496        if let Some(route_id) = ctx.route_id.as_deref() {
497            if self.rate_limit_manager.has_route_limiter(route_id) {
498                let rate_result = self.rate_limit_manager.check(
499                    route_id,
500                    &ctx.client_ip,
501                    &ctx.path,
502                    Option::<&NoHeaderAccessor>::None,
503                );
504
505                // Store rate limit info for response headers (even if allowed)
506                if rate_result.limit > 0 {
507                    ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
508                        limit: rate_result.limit,
509                        remaining: rate_result.remaining,
510                        reset_at: rate_result.reset_at,
511                    });
512                }
513
514                if !rate_result.allowed {
515                    use sentinel_config::RateLimitAction;
516
517                    match rate_result.action {
518                        RateLimitAction::Reject => {
519                            warn!(
520                                correlation_id = %ctx.trace_id,
521                                route_id = route_id,
522                                client_ip = %ctx.client_ip,
523                                limiter = %rate_result.limiter,
524                                limit = rate_result.limit,
525                                remaining = rate_result.remaining,
526                                "Request rate limited"
527                            );
528                            self.metrics.record_blocked_request("rate_limited");
529
530                            // Audit log the rate limit
531                            let audit_entry = AuditLogEntry::rate_limited(
532                                &ctx.trace_id,
533                                &ctx.method,
534                                &ctx.path,
535                                &ctx.client_ip,
536                                &rate_result.limiter,
537                            )
538                            .with_route_id(route_id)
539                            .with_status_code(rate_result.status_code);
540                            self.log_manager.log_audit(&audit_entry);
541
542                            // Send rate limit response with headers
543                            let body = rate_result
544                                .message
545                                .unwrap_or_else(|| "Rate limit exceeded".to_string());
546
547                            // Build response with rate limit headers
548                            let retry_after = rate_result.reset_at.saturating_sub(
549                                std::time::SystemTime::now()
550                                    .duration_since(std::time::UNIX_EPOCH)
551                                    .unwrap_or_default()
552                                    .as_secs(),
553                            );
554                            crate::http_helpers::write_rate_limit_error(
555                                session,
556                                rate_result.status_code,
557                                &body,
558                                rate_result.limit,
559                                rate_result.remaining,
560                                rate_result.reset_at,
561                                retry_after,
562                            )
563                            .await?;
564                            return Ok(true); // Request complete, don't continue
565                        }
566                        RateLimitAction::LogOnly => {
567                            debug!(
568                                correlation_id = %ctx.trace_id,
569                                route_id = route_id,
570                                "Rate limit exceeded (log only mode)"
571                            );
572                            // Continue processing
573                        }
574                        RateLimitAction::Delay => {
575                            // Apply delay if suggested by rate limiter
576                            if let Some(delay_ms) = rate_result.suggested_delay_ms {
577                                // Cap delay at the configured maximum
578                                let actual_delay = delay_ms.min(rate_result.max_delay_ms);
579
580                                if actual_delay > 0 {
581                                    debug!(
582                                        correlation_id = %ctx.trace_id,
583                                        route_id = route_id,
584                                        suggested_delay_ms = delay_ms,
585                                        max_delay_ms = rate_result.max_delay_ms,
586                                        actual_delay_ms = actual_delay,
587                                        "Applying rate limit delay"
588                                    );
589
590                                    tokio::time::sleep(std::time::Duration::from_millis(
591                                        actual_delay,
592                                    ))
593                                    .await;
594                                }
595                            }
596                            // Continue processing after delay
597                        }
598                    }
599                }
600            }
601        }
602
603        // Geo filtering
604        if let Some(route_id) = ctx.route_id.as_deref() {
605            if let Some(ref route_config) = ctx.route_config {
606                for filter_id in &route_config.filters {
607                    if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
608                        // Store country code for response header
609                        ctx.geo_country_code = result.country_code.clone();
610                        ctx.geo_lookup_performed = true;
611
612                        if !result.allowed {
613                            warn!(
614                                correlation_id = %ctx.trace_id,
615                                route_id = route_id,
616                                client_ip = %ctx.client_ip,
617                                country = ?result.country_code,
618                                filter_id = %filter_id,
619                                "Request blocked by geo filter"
620                            );
621                            self.metrics.record_blocked_request("geo_blocked");
622
623                            // Audit log the geo block
624                            let audit_entry = AuditLogEntry::new(
625                                &ctx.trace_id,
626                                AuditEventType::Blocked,
627                                &ctx.method,
628                                &ctx.path,
629                                &ctx.client_ip,
630                            )
631                            .with_route_id(route_id)
632                            .with_status_code(result.status_code)
633                            .with_reason(format!(
634                                "Geo blocked: country={}, filter={}",
635                                result.country_code.as_deref().unwrap_or("unknown"),
636                                filter_id
637                            ));
638                            self.log_manager.log_audit(&audit_entry);
639
640                            // Send geo block response
641                            let body = result
642                                .block_message
643                                .unwrap_or_else(|| "Access denied".to_string());
644
645                            crate::http_helpers::write_error(
646                                session,
647                                result.status_code,
648                                &body,
649                                "text/plain",
650                            )
651                            .await?;
652                            return Ok(true); // Request complete, don't continue
653                        }
654
655                        // Only check first geo filter that matches
656                        break;
657                    }
658                }
659            }
660        }
661
662        // Check for WebSocket upgrade requests
663        let is_websocket_upgrade = session
664            .req_header()
665            .headers
666            .get(http::header::UPGRADE)
667            .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
668            .unwrap_or(false);
669
670        if is_websocket_upgrade {
671            ctx.is_websocket_upgrade = true;
672
673            // Check if route allows WebSocket upgrades
674            if let Some(ref route_config) = ctx.route_config {
675                if !route_config.websocket {
676                    warn!(
677                        correlation_id = %ctx.trace_id,
678                        route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
679                        client_ip = %ctx.client_ip,
680                        "WebSocket upgrade rejected: not enabled for route"
681                    );
682
683                    self.metrics.record_blocked_request("websocket_not_enabled");
684
685                    // Audit log the rejection
686                    let audit_entry = AuditLogEntry::new(
687                        &ctx.trace_id,
688                        AuditEventType::Blocked,
689                        &ctx.method,
690                        &ctx.path,
691                        &ctx.client_ip,
692                    )
693                    .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
694                    .with_action("websocket_rejected")
695                    .with_reason("WebSocket not enabled for route");
696                    self.log_manager.log_audit(&audit_entry);
697
698                    // Send 403 Forbidden response
699                    crate::http_helpers::write_error(
700                        session,
701                        403,
702                        "WebSocket not enabled for this route",
703                        "text/plain",
704                    )
705                    .await?;
706                    return Ok(true); // Request complete, don't continue
707                }
708
709                debug!(
710                    correlation_id = %ctx.trace_id,
711                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
712                    "WebSocket upgrade request allowed"
713                );
714
715                // Check for WebSocket frame inspection
716                if route_config.websocket_inspection {
717                    // Check for compression negotiation - skip inspection if permessage-deflate
718                    let has_compression = session
719                        .req_header()
720                        .headers
721                        .get("Sec-WebSocket-Extensions")
722                        .and_then(|v| v.to_str().ok())
723                        .map(|s| s.contains("permessage-deflate"))
724                        .unwrap_or(false);
725
726                    if has_compression {
727                        debug!(
728                            correlation_id = %ctx.trace_id,
729                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
730                            "WebSocket inspection skipped: permessage-deflate negotiated"
731                        );
732                        ctx.websocket_skip_inspection = true;
733                    } else {
734                        ctx.websocket_inspection_enabled = true;
735
736                        // Get agents that handle WebSocketFrame events
737                        ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
738                            sentinel_agent_protocol::EventType::WebSocketFrame,
739                        );
740
741                        debug!(
742                            correlation_id = %ctx.trace_id,
743                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
744                            agent_count = ctx.websocket_inspection_agents.len(),
745                            "WebSocket frame inspection enabled"
746                        );
747                    }
748                }
749            }
750        }
751
752        // Use cached route config from upstream_peer (avoids duplicate route matching)
753        // Handle static file and builtin routes
754        if let Some(route_config) = ctx.route_config.clone() {
755            if route_config.service_type == sentinel_config::ServiceType::Static {
756                trace!(
757                    correlation_id = %ctx.trace_id,
758                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
759                    "Handling static file route"
760                );
761                // Create a minimal RouteMatch for the handler
762                let route_match = crate::routing::RouteMatch {
763                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
764                    config: route_config.clone(),
765                };
766                return self.handle_static_route(session, ctx, &route_match).await;
767            } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
768                trace!(
769                    correlation_id = %ctx.trace_id,
770                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
771                    builtin_handler = ?route_config.builtin_handler,
772                    "Handling builtin route"
773                );
774                // Create a minimal RouteMatch for the handler
775                let route_match = crate::routing::RouteMatch {
776                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
777                    config: route_config.clone(),
778                };
779                return self.handle_builtin_route(session, ctx, &route_match).await;
780            }
781        }
782
783        // API validation for API routes
784        if let Some(route_id) = ctx.route_id.clone() {
785            if let Some(validator) = self.validators.get(&route_id).await {
786                trace!(
787                    correlation_id = %ctx.trace_id,
788                    route_id = %route_id,
789                    "Running API schema validation"
790                );
791                if let Some(result) = self
792                    .validate_api_request(session, ctx, &route_id, &validator)
793                    .await?
794                {
795                    debug!(
796                        correlation_id = %ctx.trace_id,
797                        route_id = %route_id,
798                        validation_passed = result,
799                        "API validation complete"
800                    );
801                    return Ok(result);
802                }
803            }
804        }
805
806        // Get client address before mutable borrow
807        let client_addr = session
808            .client_addr()
809            .map(|a| format!("{}", a))
810            .unwrap_or_else(|| "unknown".to_string());
811        let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
812
813        let req_header = session.req_header_mut();
814
815        // Add correlation ID header
816        req_header
817            .insert_header("X-Correlation-Id", &ctx.trace_id)
818            .ok();
819        req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
820
821        // Use cached config (set in upstream_peer, or fetch now if needed)
822        let config = ctx
823            .config
824            .get_or_insert_with(|| self.config_manager.current());
825
826        // Enforce header limits (fast path: skip if limits are very high)
827        const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; // 1MB = effectively unlimited
828
829        // Header count check - O(1)
830        let header_count = req_header.headers.len();
831        if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
832            && header_count > config.limits.max_header_count
833        {
834            warn!(
835                correlation_id = %ctx.trace_id,
836                header_count = header_count,
837                limit = config.limits.max_header_count,
838                "Request blocked: exceeds header count limit"
839            );
840
841            self.metrics.record_blocked_request("header_count_exceeded");
842            return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
843        }
844
845        // Header size check - O(n), skip if limit is very high
846        if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
847            let total_header_size: usize = req_header
848                .headers
849                .iter()
850                .map(|(k, v)| k.as_str().len() + v.len())
851                .sum();
852
853            if total_header_size > config.limits.max_header_size_bytes {
854                warn!(
855                    correlation_id = %ctx.trace_id,
856                    header_size = total_header_size,
857                    limit = config.limits.max_header_size_bytes,
858                    "Request blocked: exceeds header size limit"
859                );
860
861                self.metrics.record_blocked_request("header_size_exceeded");
862                return Err(Error::explain(
863                    ErrorType::InternalError,
864                    "Headers too large",
865                ));
866            }
867        }
868
869        // Process through external agents
870        trace!(
871            correlation_id = %ctx.trace_id,
872            "Processing request through agents"
873        );
874        if let Err(e) = self
875            .process_agents(session, ctx, &client_addr, client_port)
876            .await
877        {
878            // Check if this is an HTTPStatus error (e.g., agent block or fail-closed)
879            // In that case, we need to send a proper HTTP response instead of just closing the connection
880            if let ErrorType::HTTPStatus(status) = e.etype() {
881                // Extract the message from the error (the context part after "HTTPStatus context:")
882                let error_msg = e.to_string();
883                let body = error_msg
884                    .split("context:")
885                    .nth(1)
886                    .map(|s| s.trim())
887                    .unwrap_or("Request blocked");
888                debug!(
889                    correlation_id = %ctx.trace_id,
890                    status = status,
891                    body = %body,
892                    "Sending HTTP error response for agent block"
893                );
894                crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
895                return Ok(true); // Request complete, don't continue to upstream
896            }
897            // For other errors, propagate them
898            return Err(e);
899        }
900
901        trace!(
902            correlation_id = %ctx.trace_id,
903            "Request filter phase complete, forwarding to upstream"
904        );
905
906        Ok(false) // Continue processing
907    }
908
909    /// Process incoming request body chunks.
910    /// Used for body size enforcement and WAF/agent inspection.
911    ///
912    /// Supports two modes:
913    /// - **Buffer mode** (default): Buffer chunks until end of stream or limit, then send to agents
914    /// - **Stream mode**: Send each chunk immediately to agents as it arrives
915    async fn request_body_filter(
916        &self,
917        _session: &mut Session,
918        body: &mut Option<Bytes>,
919        end_of_stream: bool,
920        ctx: &mut Self::CTX,
921    ) -> Result<(), Box<Error>> {
922        use sentinel_config::BodyStreamingMode;
923
924        // Handle WebSocket frame inspection (client -> server)
925        if ctx.is_websocket_upgrade {
926            if let Some(ref handler) = ctx.websocket_handler {
927                let result = handler.process_client_data(body.take()).await;
928                match result {
929                    crate::websocket::ProcessResult::Forward(data) => {
930                        *body = data;
931                    }
932                    crate::websocket::ProcessResult::Close(reason) => {
933                        warn!(
934                            correlation_id = %ctx.trace_id,
935                            code = reason.code,
936                            reason = %reason.reason,
937                            "WebSocket connection closed by agent (client->server)"
938                        );
939                        // Return an error to close the connection
940                        return Err(Error::explain(
941                            ErrorType::InternalError,
942                            format!("WebSocket closed: {} {}", reason.code, reason.reason),
943                        ));
944                    }
945                }
946            }
947            // Skip normal body processing for WebSocket
948            return Ok(());
949        }
950
951        // Track request body size
952        let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
953        if chunk_len > 0 {
954            ctx.request_body_bytes += chunk_len as u64;
955
956            trace!(
957                correlation_id = %ctx.trace_id,
958                chunk_size = chunk_len,
959                total_body_bytes = ctx.request_body_bytes,
960                end_of_stream = end_of_stream,
961                streaming_mode = ?ctx.request_body_streaming_mode,
962                "Processing request body chunk"
963            );
964
965            // Check body size limit (use cached config)
966            let config = ctx
967                .config
968                .get_or_insert_with(|| self.config_manager.current());
969            if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
970                warn!(
971                    correlation_id = %ctx.trace_id,
972                    body_bytes = ctx.request_body_bytes,
973                    limit = config.limits.max_body_size_bytes,
974                    "Request body size limit exceeded"
975                );
976                self.metrics.record_blocked_request("body_size_exceeded");
977                return Err(Error::explain(
978                    ErrorType::InternalError,
979                    "Request body too large",
980                ));
981            }
982        }
983
984        // Body inspection for agents (WAF, etc.)
985        if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
986            let config = ctx
987                .config
988                .get_or_insert_with(|| self.config_manager.current());
989            let max_inspection_bytes = config
990                .waf
991                .as_ref()
992                .map(|w| w.body_inspection.max_inspection_bytes as u64)
993                .unwrap_or(1024 * 1024);
994
995            match ctx.request_body_streaming_mode {
996                BodyStreamingMode::Stream => {
997                    // Stream mode: send each chunk immediately
998                    if body.is_some() {
999                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1000                            .await?;
1001                    } else if end_of_stream && ctx.agent_needs_more {
1002                        // Send final empty chunk to signal end
1003                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1004                            .await?;
1005                    }
1006                }
1007                BodyStreamingMode::Hybrid { buffer_threshold } => {
1008                    // Hybrid mode: buffer up to threshold, then stream
1009                    if ctx.body_bytes_inspected < buffer_threshold as u64 {
1010                        // Still in buffering phase
1011                        if let Some(ref chunk) = body {
1012                            let bytes_to_buffer = std::cmp::min(
1013                                chunk.len(),
1014                                (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
1015                            );
1016                            ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
1017                            ctx.body_bytes_inspected += bytes_to_buffer as u64;
1018
1019                            // If we've reached threshold or end of stream, switch to streaming
1020                            if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
1021                            {
1022                                // Send buffered content first
1023                                self.send_buffered_body_to_agents(
1024                                    end_of_stream && chunk.len() == bytes_to_buffer,
1025                                    ctx,
1026                                )
1027                                .await?;
1028                                ctx.body_buffer.clear();
1029
1030                                // If there's remaining data in this chunk, stream it
1031                                if bytes_to_buffer < chunk.len() {
1032                                    let remaining = chunk.slice(bytes_to_buffer..);
1033                                    let mut remaining_body = Some(remaining);
1034                                    self.process_body_chunk_streaming(
1035                                        &mut remaining_body,
1036                                        end_of_stream,
1037                                        ctx,
1038                                    )
1039                                    .await?;
1040                                }
1041                            }
1042                        }
1043                    } else {
1044                        // Past threshold, stream directly
1045                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1046                            .await?;
1047                    }
1048                }
1049                BodyStreamingMode::Buffer => {
1050                    // Buffer mode: collect chunks until ready to send
1051                    if let Some(ref chunk) = body {
1052                        if ctx.body_bytes_inspected < max_inspection_bytes {
1053                            let bytes_to_inspect = std::cmp::min(
1054                                chunk.len() as u64,
1055                                max_inspection_bytes - ctx.body_bytes_inspected,
1056                            ) as usize;
1057
1058                            ctx.body_buffer
1059                                .extend_from_slice(&chunk[..bytes_to_inspect]);
1060                            ctx.body_bytes_inspected += bytes_to_inspect as u64;
1061
1062                            trace!(
1063                                correlation_id = %ctx.trace_id,
1064                                bytes_inspected = ctx.body_bytes_inspected,
1065                                max_inspection_bytes = max_inspection_bytes,
1066                                buffer_size = ctx.body_buffer.len(),
1067                                "Buffering body for agent inspection"
1068                            );
1069                        }
1070                    }
1071
1072                    // Send when complete or limit reached
1073                    let should_send =
1074                        end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1075                    if should_send && !ctx.body_buffer.is_empty() {
1076                        self.send_buffered_body_to_agents(end_of_stream, ctx)
1077                            .await?;
1078                        ctx.body_buffer.clear();
1079                    }
1080                }
1081            }
1082        }
1083
1084        if end_of_stream {
1085            trace!(
1086                correlation_id = %ctx.trace_id,
1087                total_body_bytes = ctx.request_body_bytes,
1088                bytes_inspected = ctx.body_bytes_inspected,
1089                "Request body complete"
1090            );
1091        }
1092
1093        Ok(())
1094    }
1095
1096    async fn response_filter(
1097        &self,
1098        _session: &mut Session,
1099        upstream_response: &mut ResponseHeader,
1100        ctx: &mut Self::CTX,
1101    ) -> Result<(), Box<Error>> {
1102        let status = upstream_response.status.as_u16();
1103        let duration = ctx.elapsed();
1104
1105        trace!(
1106            correlation_id = %ctx.trace_id,
1107            status = status,
1108            "Starting response filter phase"
1109        );
1110
1111        // Handle WebSocket 101 Switching Protocols
1112        if status == 101 && ctx.is_websocket_upgrade {
1113            if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1114                // Create WebSocket inspector and handler with metrics
1115                let inspector = crate::websocket::WebSocketInspector::with_metrics(
1116                    self.agent_manager.clone(),
1117                    ctx.route_id
1118                        .clone()
1119                        .unwrap_or_else(|| "unknown".to_string()),
1120                    ctx.trace_id.clone(),
1121                    ctx.client_ip.clone(),
1122                    100, // 100ms timeout per frame inspection
1123                    Some(self.metrics.clone()),
1124                );
1125
1126                let handler = crate::websocket::WebSocketHandler::new(
1127                    std::sync::Arc::new(inspector),
1128                    1024 * 1024, // 1MB max frame size
1129                );
1130
1131                ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1132
1133                info!(
1134                    correlation_id = %ctx.trace_id,
1135                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1136                    agent_count = ctx.websocket_inspection_agents.len(),
1137                    "WebSocket upgrade successful, frame inspection enabled"
1138                );
1139            } else if ctx.websocket_skip_inspection {
1140                debug!(
1141                    correlation_id = %ctx.trace_id,
1142                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1143                    "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1144                );
1145            } else {
1146                debug!(
1147                    correlation_id = %ctx.trace_id,
1148                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1149                    "WebSocket upgrade successful"
1150                );
1151            }
1152        }
1153
1154        // Apply security headers
1155        trace!(
1156            correlation_id = %ctx.trace_id,
1157            "Applying security headers"
1158        );
1159        self.apply_security_headers(upstream_response).ok();
1160
1161        // Add correlation ID to response
1162        upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1163
1164        // Add rate limit headers if rate limiting was applied
1165        if let Some(ref rate_info) = ctx.rate_limit_info {
1166            upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1167            upstream_response
1168                .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1169            upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1170        }
1171
1172        // Add GeoIP country header if geo lookup was performed
1173        if let Some(ref country_code) = ctx.geo_country_code {
1174            upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1175        }
1176
1177        // Generate custom error pages for error responses
1178        if status >= 400 {
1179            trace!(
1180                correlation_id = %ctx.trace_id,
1181                status = status,
1182                "Handling error response"
1183            );
1184            self.handle_error_response(upstream_response, ctx).await?;
1185        }
1186
1187        // Record metrics
1188        self.metrics.record_request(
1189            ctx.route_id.as_deref().unwrap_or("unknown"),
1190            &ctx.method,
1191            status,
1192            duration,
1193        );
1194
1195        // Record OpenTelemetry span status
1196        if let Some(ref mut span) = ctx.otel_span {
1197            span.set_status(status);
1198            if let Some(ref upstream) = ctx.upstream {
1199                span.set_upstream(upstream, "");
1200            }
1201            if status >= 500 {
1202                span.record_error(&format!("HTTP {}", status));
1203            }
1204        }
1205
1206        // Record passive health check
1207        if let Some(ref upstream) = ctx.upstream {
1208            let success = status < 500;
1209
1210            trace!(
1211                correlation_id = %ctx.trace_id,
1212                upstream = %upstream,
1213                success = success,
1214                status = status,
1215                "Recording passive health check result"
1216            );
1217
1218            let error_msg = if !success {
1219                Some(format!("HTTP {}", status))
1220            } else {
1221                None
1222            };
1223            self.passive_health
1224                .record_outcome(upstream, success, error_msg.as_deref())
1225                .await;
1226
1227            // Report to upstream pool
1228            if let Some(pool) = self.upstream_pools.get(upstream).await {
1229                pool.report_result(upstream, success).await;
1230            }
1231
1232            if !success {
1233                warn!(
1234                    correlation_id = %ctx.trace_id,
1235                    upstream = %upstream,
1236                    status = status,
1237                    "Upstream returned error status"
1238                );
1239            }
1240        }
1241
1242        // Final request completion log
1243        if status >= 500 {
1244            error!(
1245                correlation_id = %ctx.trace_id,
1246                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1247                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1248                method = %ctx.method,
1249                path = %ctx.path,
1250                status = status,
1251                duration_ms = duration.as_millis(),
1252                attempts = ctx.upstream_attempts,
1253                "Request completed with server error"
1254            );
1255        } else if status >= 400 {
1256            warn!(
1257                correlation_id = %ctx.trace_id,
1258                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1259                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1260                method = %ctx.method,
1261                path = %ctx.path,
1262                status = status,
1263                duration_ms = duration.as_millis(),
1264                "Request completed with client error"
1265            );
1266        } else {
1267            debug!(
1268                correlation_id = %ctx.trace_id,
1269                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1270                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1271                method = %ctx.method,
1272                path = %ctx.path,
1273                status = status,
1274                duration_ms = duration.as_millis(),
1275                attempts = ctx.upstream_attempts,
1276                "Request completed"
1277            );
1278        }
1279
1280        Ok(())
1281    }
1282
1283    /// Modify the request before sending to upstream.
1284    /// Used for header modifications, adding authentication, etc.
1285    async fn upstream_request_filter(
1286        &self,
1287        _session: &mut Session,
1288        upstream_request: &mut pingora::http::RequestHeader,
1289        ctx: &mut Self::CTX,
1290    ) -> Result<()>
1291    where
1292        Self::CTX: Send + Sync,
1293    {
1294        trace!(
1295            correlation_id = %ctx.trace_id,
1296            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1297            "Applying upstream request modifications"
1298        );
1299
1300        // Add trace ID header for upstream correlation
1301        upstream_request
1302            .insert_header("X-Trace-Id", &ctx.trace_id)
1303            .ok();
1304
1305        // Add W3C traceparent header for distributed tracing
1306        if let Some(ref span) = ctx.otel_span {
1307            let sampled = ctx.trace_context.as_ref().map(|c| c.sampled).unwrap_or(true);
1308            let traceparent =
1309                crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled);
1310            upstream_request
1311                .insert_header(crate::otel::TRACEPARENT_HEADER, &traceparent)
1312                .ok();
1313        }
1314
1315        // Add request metadata headers
1316        upstream_request
1317            .insert_header("X-Forwarded-By", "Sentinel")
1318            .ok();
1319
1320        // Apply route-specific request header modifications
1321        // Clone the modifications to avoid lifetime issues with the header API
1322        if let Some(ref route_config) = ctx.route_config {
1323            let mods = route_config.policies.request_headers.clone();
1324
1325            // Set headers (overwrite existing)
1326            for (name, value) in mods.set {
1327                upstream_request.insert_header(name, value).ok();
1328            }
1329
1330            // Add headers (append)
1331            for (name, value) in mods.add {
1332                upstream_request.append_header(name, value).ok();
1333            }
1334
1335            // Remove headers
1336            for name in &mods.remove {
1337                upstream_request.remove_header(name);
1338            }
1339
1340            trace!(
1341                correlation_id = %ctx.trace_id,
1342                "Applied request header modifications"
1343            );
1344        }
1345
1346        // Remove sensitive headers that shouldn't go to upstream
1347        upstream_request.remove_header("X-Internal-Token");
1348        upstream_request.remove_header("Authorization-Internal");
1349
1350        // === Traffic Mirroring / Shadowing ===
1351        // Check if this route has shadow configuration
1352        if let Some(ref route_config) = ctx.route_config {
1353            if let Some(ref shadow_config) = route_config.shadow {
1354                // Get snapshot of upstream pools for shadow manager
1355                let pools_snapshot = self.upstream_pools.snapshot().await;
1356                let upstream_pools = std::sync::Arc::new(pools_snapshot);
1357
1358                // Get route ID for metrics labeling
1359                let route_id = ctx.route_id.clone().unwrap_or_else(|| "unknown".to_string());
1360
1361                // Create shadow manager
1362                let shadow_manager = crate::shadow::ShadowManager::new(
1363                    upstream_pools,
1364                    shadow_config.clone(),
1365                    Some(std::sync::Arc::clone(&self.metrics)),
1366                    route_id,
1367                );
1368
1369                // Check if we should shadow this request (sampling + header check)
1370                if shadow_manager.should_shadow(upstream_request) {
1371                    trace!(
1372                        correlation_id = %ctx.trace_id,
1373                        shadow_upstream = %shadow_config.upstream,
1374                        percentage = shadow_config.percentage,
1375                        "Shadowing request"
1376                    );
1377
1378                    // Clone headers for shadow request
1379                    let shadow_headers = upstream_request.clone();
1380
1381                    // Create request context for shadow (simplified from proxy context)
1382                    let shadow_ctx = crate::upstream::RequestContext {
1383                        client_ip: ctx.client_ip.parse().ok(),
1384                        headers: std::collections::HashMap::new(), // Empty for now
1385                        path: ctx.path.clone(),
1386                        method: ctx.method.clone(),
1387                    };
1388
1389                    // Determine if we should buffer the body
1390                    let buffer_body = shadow_config.buffer_body
1391                        && crate::shadow::should_buffer_method(&ctx.method);
1392
1393                    // TODO: Buffer request body if needed
1394                    // For now, we don't buffer bodies (would require significant refactoring
1395                    // of the request_body_filter to capture and clone body data)
1396                    let body = None;
1397
1398                    // Fire off shadow request asynchronously (non-blocking)
1399                    shadow_manager.shadow_request(shadow_headers, body, shadow_ctx);
1400                }
1401            }
1402        }
1403
1404        Ok(())
1405    }
1406
1407    /// Process response body chunks from upstream.
1408    /// Used for response size tracking and WAF inspection.
1409    ///
1410    /// Note: Response body inspection is currently buffered only (streaming mode not supported
1411    /// for responses due to Pingora's synchronous filter design).
1412    fn response_body_filter(
1413        &self,
1414        _session: &mut Session,
1415        body: &mut Option<Bytes>,
1416        end_of_stream: bool,
1417        ctx: &mut Self::CTX,
1418    ) -> Result<Option<Duration>, Box<Error>> {
1419        // Handle WebSocket frame inspection (server -> client)
1420        // Note: This filter is synchronous, so we use block_in_place for async agent calls
1421        if ctx.is_websocket_upgrade {
1422            if let Some(ref handler) = ctx.websocket_handler {
1423                let handler = handler.clone();
1424                let data = body.take();
1425
1426                // Use block_in_place to run async handler from sync context
1427                // This is safe because Pingora uses a multi-threaded tokio runtime
1428                let result = tokio::task::block_in_place(|| {
1429                    tokio::runtime::Handle::current()
1430                        .block_on(async { handler.process_server_data(data).await })
1431                });
1432
1433                match result {
1434                    crate::websocket::ProcessResult::Forward(data) => {
1435                        *body = data;
1436                    }
1437                    crate::websocket::ProcessResult::Close(reason) => {
1438                        warn!(
1439                            correlation_id = %ctx.trace_id,
1440                            code = reason.code,
1441                            reason = %reason.reason,
1442                            "WebSocket connection closed by agent (server->client)"
1443                        );
1444                        // For sync filter, we can't return an error that closes the connection
1445                        // Instead, inject a close frame
1446                        let close_frame =
1447                            crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
1448                        let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
1449                        if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
1450                            *body = Some(Bytes::from(encoded));
1451                        }
1452                    }
1453                }
1454            }
1455            // Skip normal body processing for WebSocket
1456            return Ok(None);
1457        }
1458
1459        // Track response body size
1460        if let Some(ref chunk) = body {
1461            ctx.response_bytes += chunk.len() as u64;
1462
1463            trace!(
1464                correlation_id = %ctx.trace_id,
1465                chunk_size = chunk.len(),
1466                total_response_bytes = ctx.response_bytes,
1467                end_of_stream = end_of_stream,
1468                "Processing response body chunk"
1469            );
1470
1471            // Response body inspection (buffered mode only)
1472            // Note: Streaming mode for response bodies is not currently supported
1473            // due to Pingora's synchronous response_body_filter design
1474            if ctx.response_body_inspection_enabled
1475                && !ctx.response_body_inspection_agents.is_empty()
1476            {
1477                let config = ctx
1478                    .config
1479                    .get_or_insert_with(|| self.config_manager.current());
1480                let max_inspection_bytes = config
1481                    .waf
1482                    .as_ref()
1483                    .map(|w| w.body_inspection.max_inspection_bytes as u64)
1484                    .unwrap_or(1024 * 1024);
1485
1486                if ctx.response_body_bytes_inspected < max_inspection_bytes {
1487                    let bytes_to_inspect = std::cmp::min(
1488                        chunk.len() as u64,
1489                        max_inspection_bytes - ctx.response_body_bytes_inspected,
1490                    ) as usize;
1491
1492                    // Buffer for later processing (during logging phase)
1493                    // Response body inspection happens asynchronously and results
1494                    // are logged rather than blocking the response
1495                    ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
1496                    ctx.response_body_chunk_index += 1;
1497
1498                    trace!(
1499                        correlation_id = %ctx.trace_id,
1500                        bytes_inspected = ctx.response_body_bytes_inspected,
1501                        max_inspection_bytes = max_inspection_bytes,
1502                        chunk_index = ctx.response_body_chunk_index,
1503                        "Tracking response body for inspection"
1504                    );
1505                }
1506            }
1507        }
1508
1509        if end_of_stream {
1510            trace!(
1511                correlation_id = %ctx.trace_id,
1512                total_response_bytes = ctx.response_bytes,
1513                response_bytes_inspected = ctx.response_body_bytes_inspected,
1514                "Response body complete"
1515            );
1516        }
1517
1518        // Return None to indicate no delay needed
1519        Ok(None)
1520    }
1521
1522    /// Called when a connection to upstream is established or reused.
1523    /// Logs connection reuse statistics for observability.
1524    async fn connected_to_upstream(
1525        &self,
1526        _session: &mut Session,
1527        reused: bool,
1528        peer: &HttpPeer,
1529        #[cfg(unix)] _fd: RawFd,
1530        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
1531        digest: Option<&Digest>,
1532        ctx: &mut Self::CTX,
1533    ) -> Result<(), Box<Error>> {
1534        // Track connection reuse for metrics
1535        ctx.connection_reused = reused;
1536
1537        // Log connection establishment/reuse
1538        if reused {
1539            trace!(
1540                correlation_id = %ctx.trace_id,
1541                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1542                peer_address = %peer.address(),
1543                "Reusing existing upstream connection"
1544            );
1545        } else {
1546            debug!(
1547                correlation_id = %ctx.trace_id,
1548                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1549                peer_address = %peer.address(),
1550                ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
1551                "Established new upstream connection"
1552            );
1553        }
1554
1555        Ok(())
1556    }
1557
1558    // =========================================================================
1559    // HTTP Caching - Pingora Cache Integration
1560    // =========================================================================
1561
1562    /// Decide if the request should use caching.
1563    ///
1564    /// This method is called early in the request lifecycle to determine if
1565    /// the response should be served from cache or if the response should
1566    /// be cached.
1567    fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
1568        // Check if route has caching enabled
1569        let route_id = match ctx.route_id.as_deref() {
1570            Some(id) => id,
1571            None => {
1572                trace!(
1573                    correlation_id = %ctx.trace_id,
1574                    "Cache filter: no route ID, skipping cache"
1575                );
1576                return Ok(());
1577            }
1578        };
1579
1580        // Check if caching is enabled for this route
1581        if !self.cache_manager.is_enabled(route_id) {
1582            trace!(
1583                correlation_id = %ctx.trace_id,
1584                route_id = %route_id,
1585                "Cache disabled for route"
1586            );
1587            return Ok(());
1588        }
1589
1590        // Check if method is cacheable (typically GET/HEAD)
1591        if !self
1592            .cache_manager
1593            .is_method_cacheable(route_id, &ctx.method)
1594        {
1595            trace!(
1596                correlation_id = %ctx.trace_id,
1597                route_id = %route_id,
1598                method = %ctx.method,
1599                "Method not cacheable"
1600            );
1601            return Ok(());
1602        }
1603
1604        // Enable caching for this request using Pingora's cache infrastructure
1605        debug!(
1606            correlation_id = %ctx.trace_id,
1607            route_id = %route_id,
1608            method = %ctx.method,
1609            path = %ctx.path,
1610            "Enabling HTTP caching for request"
1611        );
1612
1613        // Get static references to cache infrastructure
1614        let storage = get_cache_storage();
1615        let eviction = get_cache_eviction();
1616        let cache_lock = get_cache_lock();
1617
1618        // Enable the cache with storage, eviction, and lock
1619        session.cache.enable(
1620            storage,
1621            Some(eviction),
1622            None, // predictor - optional
1623            Some(cache_lock),
1624            None, // option overrides
1625        );
1626
1627        // Mark request as cache-eligible in context
1628        ctx.cache_eligible = true;
1629
1630        trace!(
1631            correlation_id = %ctx.trace_id,
1632            route_id = %route_id,
1633            cache_enabled = session.cache.enabled(),
1634            "Cache enabled for request"
1635        );
1636
1637        Ok(())
1638    }
1639
1640    /// Generate the cache key for this request.
1641    ///
1642    /// The cache key uniquely identifies the cached response. It typically
1643    /// includes the method, host, path, and potentially query parameters.
1644    fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
1645        let req_header = session.req_header();
1646        let method = req_header.method.as_str();
1647        let path = req_header.uri.path();
1648        let host = ctx.host.as_deref().unwrap_or("unknown");
1649        let query = req_header.uri.query();
1650
1651        // Generate cache key using our cache manager
1652        let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1653
1654        trace!(
1655            correlation_id = %ctx.trace_id,
1656            cache_key = %key_string,
1657            "Generated cache key"
1658        );
1659
1660        // Use Pingora's default cache key generator which handles
1661        // proper hashing and internal format
1662        Ok(CacheKey::default(req_header))
1663    }
1664
1665    /// Called when a cache miss occurs.
1666    ///
1667    /// This is called when the cache lookup found no matching entry.
1668    /// We can use this to log and track cache misses.
1669    fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
1670        // Let Pingora handle the cache miss
1671        session.cache.cache_miss();
1672
1673        // Track statistics
1674        if let Some(route_id) = ctx.route_id.as_deref() {
1675            self.cache_manager.stats().record_miss();
1676
1677            trace!(
1678                correlation_id = %ctx.trace_id,
1679                route_id = %route_id,
1680                path = %ctx.path,
1681                "Cache miss"
1682            );
1683        }
1684    }
1685
1686    /// Called after a successful cache lookup.
1687    ///
1688    /// This filter allows inspecting the cached response before serving it.
1689    /// Returns `None` to serve the cached response, or a `ForcedInvalidationKind`
1690    /// to invalidate and refetch.
1691    async fn cache_hit_filter(
1692        &self,
1693        session: &mut Session,
1694        meta: &CacheMeta,
1695        _hit_handler: &mut HitHandler,
1696        is_fresh: bool,
1697        ctx: &mut Self::CTX,
1698    ) -> Result<Option<ForcedInvalidationKind>>
1699    where
1700        Self::CTX: Send + Sync,
1701    {
1702        // Check if this cache entry should be invalidated due to a purge request
1703        let req_header = session.req_header();
1704        let method = req_header.method.as_str();
1705        let path = req_header.uri.path();
1706        let host = req_header.uri.host().unwrap_or("localhost");
1707        let query = req_header.uri.query();
1708
1709        // Generate the cache key for this request
1710        let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1711
1712        // Check if this key should be invalidated
1713        if self.cache_manager.should_invalidate(&cache_key) {
1714            info!(
1715                correlation_id = %ctx.trace_id,
1716                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1717                cache_key = %cache_key,
1718                "Cache entry invalidated by purge request"
1719            );
1720            // Force expiration so the entry is refetched from upstream
1721            return Ok(Some(ForcedInvalidationKind::ForceExpired));
1722        }
1723
1724        // Track cache hit statistics
1725        if is_fresh {
1726            self.cache_manager.stats().record_hit();
1727
1728            debug!(
1729                correlation_id = %ctx.trace_id,
1730                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1731                is_fresh = is_fresh,
1732                "Cache hit (fresh)"
1733            );
1734        } else {
1735            trace!(
1736                correlation_id = %ctx.trace_id,
1737                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1738                is_fresh = is_fresh,
1739                "Cache hit (stale)"
1740            );
1741        }
1742
1743        // Serve the cached response without invalidation
1744        Ok(None)
1745    }
1746
1747    /// Decide if the response should be cached.
1748    ///
1749    /// Called after receiving the response from upstream to determine
1750    /// if it should be stored in the cache.
1751    fn response_cache_filter(
1752        &self,
1753        _session: &Session,
1754        resp: &ResponseHeader,
1755        ctx: &mut Self::CTX,
1756    ) -> Result<RespCacheable> {
1757        let route_id = match ctx.route_id.as_deref() {
1758            Some(id) => id,
1759            None => {
1760                return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1761                    "no_route",
1762                )));
1763            }
1764        };
1765
1766        // Check if caching is enabled for this route
1767        if !self.cache_manager.is_enabled(route_id) {
1768            return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1769                "disabled",
1770            )));
1771        }
1772
1773        let status = resp.status.as_u16();
1774
1775        // Check if status code is cacheable
1776        if !self.cache_manager.is_status_cacheable(route_id, status) {
1777            trace!(
1778                correlation_id = %ctx.trace_id,
1779                route_id = %route_id,
1780                status = status,
1781                "Status code not cacheable"
1782            );
1783            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1784        }
1785
1786        // Check Cache-Control header for no-store, no-cache, private
1787        if let Some(cache_control) = resp.headers.get("cache-control") {
1788            if let Ok(cc_str) = cache_control.to_str() {
1789                if crate::cache::CacheManager::is_no_cache(cc_str) {
1790                    trace!(
1791                        correlation_id = %ctx.trace_id,
1792                        route_id = %route_id,
1793                        cache_control = %cc_str,
1794                        "Response has no-cache directive"
1795                    );
1796                    return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1797                }
1798            }
1799        }
1800
1801        // Calculate TTL from Cache-Control or use default
1802        let cache_control = resp
1803            .headers
1804            .get("cache-control")
1805            .and_then(|v| v.to_str().ok());
1806        let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
1807
1808        if ttl.is_zero() {
1809            trace!(
1810                correlation_id = %ctx.trace_id,
1811                route_id = %route_id,
1812                "TTL is zero, not caching"
1813            );
1814            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1815        }
1816
1817        // Get route cache config for stale settings
1818        let config = self
1819            .cache_manager
1820            .get_route_config(route_id)
1821            .unwrap_or_default();
1822
1823        // Create timestamps for cache metadata
1824        let now = std::time::SystemTime::now();
1825        let fresh_until = now + ttl;
1826
1827        // Clone the response header for storage
1828        let header = resp.clone();
1829
1830        // Create CacheMeta with proper timestamps and TTLs
1831        let cache_meta = CacheMeta::new(
1832            fresh_until,
1833            now,
1834            config.stale_while_revalidate_secs as u32,
1835            config.stale_if_error_secs as u32,
1836            header,
1837        );
1838
1839        // Track the cache store
1840        self.cache_manager.stats().record_store();
1841
1842        debug!(
1843            correlation_id = %ctx.trace_id,
1844            route_id = %route_id,
1845            status = status,
1846            ttl_secs = ttl.as_secs(),
1847            stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1848            stale_if_error_secs = config.stale_if_error_secs,
1849            "Caching response"
1850        );
1851
1852        Ok(RespCacheable::Cacheable(cache_meta))
1853    }
1854
1855    /// Decide whether to serve stale content on error or during revalidation.
1856    ///
1857    /// This implements stale-while-revalidate and stale-if-error semantics.
1858    fn should_serve_stale(
1859        &self,
1860        _session: &mut Session,
1861        ctx: &mut Self::CTX,
1862        error: Option<&Error>,
1863    ) -> bool {
1864        let route_id = match ctx.route_id.as_deref() {
1865            Some(id) => id,
1866            None => return false,
1867        };
1868
1869        // Get route cache config for stale settings
1870        let config = match self.cache_manager.get_route_config(route_id) {
1871            Some(c) => c,
1872            None => return false,
1873        };
1874
1875        // If there's an upstream error, use stale-if-error
1876        if let Some(e) = error {
1877            // Only serve stale for upstream errors
1878            if e.esource() == &pingora::ErrorSource::Upstream {
1879                debug!(
1880                    correlation_id = %ctx.trace_id,
1881                    route_id = %route_id,
1882                    error = %e,
1883                    stale_if_error_secs = config.stale_if_error_secs,
1884                    "Considering stale-if-error"
1885                );
1886                return config.stale_if_error_secs > 0;
1887            }
1888        }
1889
1890        // During stale-while-revalidate (error is None)
1891        if error.is_none() && config.stale_while_revalidate_secs > 0 {
1892            trace!(
1893                correlation_id = %ctx.trace_id,
1894                route_id = %route_id,
1895                stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1896                "Allowing stale-while-revalidate"
1897            );
1898            return true;
1899        }
1900
1901        false
1902    }
1903
1904    /// Handle Range header for byte-range requests (streaming support).
1905    ///
1906    /// This method is called when a Range header is present in the request.
1907    /// It allows proper handling of:
1908    /// - Video streaming (HTML5 video seeking)
1909    /// - Large file downloads with resume support
1910    /// - Partial content delivery
1911    ///
1912    /// Uses Pingora's built-in range handling with route-specific logging.
1913    fn range_header_filter(
1914        &self,
1915        session: &mut Session,
1916        response: &mut ResponseHeader,
1917        ctx: &mut Self::CTX,
1918    ) -> pingora_proxy::RangeType
1919    where
1920        Self::CTX: Send + Sync,
1921    {
1922        // Check if route supports range requests
1923        let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
1924            // Static file routes and media routes should support range requests
1925            matches!(
1926                config.service_type,
1927                sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
1928            )
1929        });
1930
1931        if !supports_range {
1932            trace!(
1933                correlation_id = %ctx.trace_id,
1934                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1935                "Range request not supported for this route type"
1936            );
1937            return pingora_proxy::RangeType::None;
1938        }
1939
1940        // Use Pingora's built-in range header parsing and handling
1941        let range_type = pingora_proxy::range_header_filter(session.req_header(), response);
1942
1943        match &range_type {
1944            pingora_proxy::RangeType::None => {
1945                trace!(
1946                    correlation_id = %ctx.trace_id,
1947                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1948                    "No range request or not applicable"
1949                );
1950            }
1951            pingora_proxy::RangeType::Single(range) => {
1952                trace!(
1953                    correlation_id = %ctx.trace_id,
1954                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1955                    range_start = range.start,
1956                    range_end = range.end,
1957                    "Processing single-range request"
1958                );
1959            }
1960            pingora_proxy::RangeType::Multi(multi) => {
1961                trace!(
1962                    correlation_id = %ctx.trace_id,
1963                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1964                    range_count = multi.ranges.len(),
1965                    "Processing multi-range request"
1966                );
1967            }
1968            pingora_proxy::RangeType::Invalid => {
1969                debug!(
1970                    correlation_id = %ctx.trace_id,
1971                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1972                    "Invalid range header"
1973                );
1974            }
1975        }
1976
1977        range_type
1978    }
1979
1980    /// Handle fatal proxy errors by generating custom error pages.
1981    /// Called when the proxy itself fails to process the request.
1982    async fn fail_to_proxy(
1983        &self,
1984        session: &mut Session,
1985        e: &Error,
1986        ctx: &mut Self::CTX,
1987    ) -> pingora_proxy::FailToProxy
1988    where
1989        Self::CTX: Send + Sync,
1990    {
1991        let error_code = match e.etype() {
1992            // Connection errors
1993            ErrorType::ConnectRefused => 503,
1994            ErrorType::ConnectTimedout => 504,
1995            ErrorType::ConnectNoRoute => 502,
1996
1997            // Timeout errors
1998            ErrorType::ReadTimedout => 504,
1999            ErrorType::WriteTimedout => 504,
2000
2001            // TLS errors
2002            ErrorType::TLSHandshakeFailure => 502,
2003            ErrorType::InvalidCert => 502,
2004
2005            // Protocol errors
2006            ErrorType::InvalidHTTPHeader => 400,
2007            ErrorType::H2Error => 502,
2008
2009            // Resource errors
2010            ErrorType::ConnectProxyFailure => 502,
2011            ErrorType::ConnectionClosed => 502,
2012
2013            // Explicit HTTP status (e.g., from agent fail-closed blocking)
2014            ErrorType::HTTPStatus(status) => *status,
2015
2016            // Internal errors - return 502 for upstream issues (more accurate than 500)
2017            ErrorType::InternalError => {
2018                // Check if this is an upstream-related error
2019                let error_str = e.to_string();
2020                if error_str.contains("upstream")
2021                    || error_str.contains("DNS")
2022                    || error_str.contains("resolve")
2023                {
2024                    502
2025                } else {
2026                    500
2027                }
2028            }
2029
2030            // Default to 502 for unknown errors
2031            _ => 502,
2032        };
2033
2034        error!(
2035            correlation_id = %ctx.trace_id,
2036            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2037            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2038            error_type = ?e.etype(),
2039            error = %e,
2040            error_code = error_code,
2041            "Proxy error occurred"
2042        );
2043
2044        // Record the error in metrics
2045        self.metrics
2046            .record_blocked_request(&format!("proxy_error_{}", error_code));
2047
2048        // Write error response to ensure client receives a proper HTTP response
2049        // This is necessary because some errors occur before the upstream connection
2050        // is established, and Pingora may not send a response automatically
2051        let error_message = match error_code {
2052            400 => "Bad Request",
2053            502 => "Bad Gateway",
2054            503 => "Service Unavailable",
2055            504 => "Gateway Timeout",
2056            _ => "Internal Server Error",
2057        };
2058
2059        // Build a minimal error response body
2060        let body = format!(
2061            r#"{{"error":"{} {}","trace_id":"{}"}}"#,
2062            error_code, error_message, ctx.trace_id
2063        );
2064
2065        // Write the response header
2066        let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
2067        header
2068            .insert_header("Content-Type", "application/json")
2069            .ok();
2070        header
2071            .insert_header("Content-Length", body.len().to_string())
2072            .ok();
2073        header
2074            .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
2075            .ok();
2076        header.insert_header("Connection", "close").ok();
2077
2078        // Write headers and body
2079        if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
2080            warn!(
2081                correlation_id = %ctx.trace_id,
2082                error = %write_err,
2083                "Failed to write error response header"
2084            );
2085        } else {
2086            // Write the body
2087            if let Err(write_err) = session
2088                .write_response_body(Some(bytes::Bytes::from(body)), true)
2089                .await
2090            {
2091                warn!(
2092                    correlation_id = %ctx.trace_id,
2093                    error = %write_err,
2094                    "Failed to write error response body"
2095                );
2096            }
2097        }
2098
2099        // Return the error response info
2100        // can_reuse_downstream: false since we already wrote and closed the response
2101        pingora_proxy::FailToProxy {
2102            error_code,
2103            can_reuse_downstream: false,
2104        }
2105    }
2106
2107    /// Handle errors that occur during proxying after upstream connection is established.
2108    ///
2109    /// This method enables retry logic and circuit breaker integration.
2110    /// It's called when an error occurs during the request/response exchange
2111    /// with the upstream server.
2112    fn error_while_proxy(
2113        &self,
2114        peer: &HttpPeer,
2115        session: &mut Session,
2116        e: Box<Error>,
2117        ctx: &mut Self::CTX,
2118        client_reused: bool,
2119    ) -> Box<Error> {
2120        let error_type = e.etype().clone();
2121        let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
2122
2123        // Classify error for retry decisions
2124        let is_retryable = matches!(
2125            error_type,
2126            ErrorType::ConnectTimedout
2127                | ErrorType::ReadTimedout
2128                | ErrorType::WriteTimedout
2129                | ErrorType::ConnectionClosed
2130                | ErrorType::ConnectRefused
2131        );
2132
2133        // Log the error with context
2134        warn!(
2135            correlation_id = %ctx.trace_id,
2136            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2137            upstream = %upstream_id,
2138            peer_address = %peer.address(),
2139            error_type = ?error_type,
2140            error = %e,
2141            client_reused = client_reused,
2142            is_retryable = is_retryable,
2143            "Error during proxy operation"
2144        );
2145
2146        // Record failure with circuit breaker via upstream pool
2147        // This is done asynchronously since we can't await in a sync fn
2148        let peer_address = peer.address().to_string();
2149        let upstream_pools = self.upstream_pools.clone();
2150        let upstream_id_owned = upstream_id.to_string();
2151        tokio::spawn(async move {
2152            if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
2153                pool.report_result(&peer_address, false).await;
2154            }
2155        });
2156
2157        // Metrics tracking
2158        self.metrics
2159            .record_blocked_request(&format!("proxy_error_{:?}", error_type));
2160
2161        // Create enhanced error with retry information
2162        let mut enhanced_error = e.more_context(format!(
2163            "Upstream: {}, Peer: {}, Attempts: {}",
2164            upstream_id,
2165            peer.address(),
2166            ctx.upstream_attempts
2167        ));
2168
2169        // Determine if retry should be attempted:
2170        // - Only retry if error is retryable type
2171        // - Only retry reused connections if buffer isn't truncated
2172        // - Track retry metrics
2173        if is_retryable {
2174            let can_retry = if client_reused {
2175                // For reused connections, check if retry buffer is intact
2176                !session.as_ref().retry_buffer_truncated()
2177            } else {
2178                // Fresh connections can always retry
2179                true
2180            };
2181
2182            enhanced_error.retry.decide_reuse(can_retry);
2183
2184            if can_retry {
2185                debug!(
2186                    correlation_id = %ctx.trace_id,
2187                    upstream = %upstream_id,
2188                    error_type = ?error_type,
2189                    "Error is retryable, will attempt retry"
2190                );
2191            }
2192        } else {
2193            // Non-retryable error - don't retry
2194            enhanced_error.retry.decide_reuse(false);
2195        }
2196
2197        enhanced_error
2198    }
2199
2200    async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
2201        // Decrement active requests
2202        self.reload_coordinator.dec_requests();
2203
2204        let duration = ctx.elapsed();
2205
2206        // Get response status
2207        let status = session
2208            .response_written()
2209            .map(|r| r.status.as_u16())
2210            .unwrap_or(0);
2211
2212        // Report result to load balancer for adaptive LB feedback
2213        // This enables latency-aware weight adjustment
2214        if let (Some(ref peer_addr), Some(ref upstream_id)) =
2215            (&ctx.selected_upstream_address, &ctx.upstream)
2216        {
2217            // Success = status code < 500 (client errors are not upstream failures)
2218            let success = status > 0 && status < 500;
2219
2220            if let Some(pool) = self.upstream_pools.get(upstream_id).await {
2221                pool.report_result_with_latency(peer_addr, success, Some(duration))
2222                    .await;
2223                trace!(
2224                    correlation_id = %ctx.trace_id,
2225                    upstream = %upstream_id,
2226                    peer_address = %peer_addr,
2227                    success = success,
2228                    duration_ms = duration.as_millis(),
2229                    status = status,
2230                    "Reported result to adaptive load balancer"
2231                );
2232            }
2233        }
2234
2235        // Write to access log file if configured
2236        if self.log_manager.access_log_enabled() {
2237            let access_entry = AccessLogEntry {
2238                timestamp: chrono::Utc::now().to_rfc3339(),
2239                trace_id: ctx.trace_id.clone(),
2240                method: ctx.method.clone(),
2241                path: ctx.path.clone(),
2242                query: ctx.query.clone(),
2243                protocol: "HTTP/1.1".to_string(),
2244                status,
2245                body_bytes: ctx.response_bytes,
2246                duration_ms: duration.as_millis() as u64,
2247                client_ip: ctx.client_ip.clone(),
2248                user_agent: ctx.user_agent.clone(),
2249                referer: ctx.referer.clone(),
2250                host: ctx.host.clone(),
2251                route_id: ctx.route_id.clone(),
2252                upstream: ctx.upstream.clone(),
2253                upstream_attempts: ctx.upstream_attempts,
2254                instance_id: self.app_state.instance_id.clone(),
2255                namespace: ctx.namespace.clone(),
2256                service: ctx.service.clone(),
2257                // New fields
2258                body_bytes_sent: ctx.response_bytes,
2259                upstream_addr: ctx.selected_upstream_address.clone(),
2260                connection_reused: ctx.connection_reused,
2261                rate_limit_hit: status == 429,
2262                geo_country: None, // TODO: Add GeoIP support
2263            };
2264            self.log_manager.log_access(&access_entry);
2265        }
2266
2267        // Log to tracing at debug level (avoid allocations if debug disabled)
2268        if tracing::enabled!(tracing::Level::DEBUG) {
2269            debug!(
2270                trace_id = %ctx.trace_id,
2271                method = %ctx.method,
2272                path = %ctx.path,
2273                route_id = ?ctx.route_id,
2274                upstream = ?ctx.upstream,
2275                status = status,
2276                duration_ms = duration.as_millis() as u64,
2277                upstream_attempts = ctx.upstream_attempts,
2278                error = ?_error.map(|e| e.to_string()),
2279                "Request completed"
2280            );
2281        }
2282
2283        // Log WebSocket upgrades at info level
2284        if ctx.is_websocket_upgrade && status == 101 {
2285            info!(
2286                trace_id = %ctx.trace_id,
2287                route_id = ?ctx.route_id,
2288                upstream = ?ctx.upstream,
2289                client_ip = %ctx.client_ip,
2290                "WebSocket connection established"
2291            );
2292        }
2293
2294        // End OpenTelemetry span
2295        if let Some(span) = ctx.otel_span.take() {
2296            span.end();
2297        }
2298    }
2299}
2300
2301// =============================================================================
2302// Helper methods for body streaming (not part of ProxyHttp trait)
2303// =============================================================================
2304
2305impl SentinelProxy {
2306    /// Process a single body chunk in streaming mode.
2307    async fn process_body_chunk_streaming(
2308        &self,
2309        body: &mut Option<Bytes>,
2310        end_of_stream: bool,
2311        ctx: &mut RequestContext,
2312    ) -> Result<(), Box<Error>> {
2313        // Clone the chunk data to avoid borrowing issues when mutating body later
2314        let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
2315        let chunk_index = ctx.request_body_chunk_index;
2316        ctx.request_body_chunk_index += 1;
2317        ctx.body_bytes_inspected += chunk_data.len() as u64;
2318
2319        debug!(
2320            correlation_id = %ctx.trace_id,
2321            chunk_index = chunk_index,
2322            chunk_size = chunk_data.len(),
2323            end_of_stream = end_of_stream,
2324            "Streaming body chunk to agents"
2325        );
2326
2327        // Create agent call context
2328        let agent_ctx = crate::agents::AgentCallContext {
2329            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
2330            metadata: sentinel_agent_protocol::RequestMetadata {
2331                correlation_id: ctx.trace_id.clone(),
2332                request_id: ctx.trace_id.clone(),
2333                client_ip: ctx.client_ip.clone(),
2334                client_port: 0,
2335                server_name: ctx.host.clone(),
2336                protocol: "HTTP/1.1".to_string(),
2337                tls_version: None,
2338                tls_cipher: None,
2339                route_id: ctx.route_id.clone(),
2340                upstream_id: ctx.upstream.clone(),
2341                timestamp: chrono::Utc::now().to_rfc3339(),
2342                traceparent: ctx.traceparent(),
2343            },
2344            route_id: ctx.route_id.clone(),
2345            upstream_id: ctx.upstream.clone(),
2346            request_body: None, // Not used in streaming mode
2347            response_body: None,
2348        };
2349
2350        let agent_ids = ctx.body_inspection_agents.clone();
2351        let total_size = None; // Unknown in streaming mode
2352
2353        match self
2354            .agent_manager
2355            .process_request_body_streaming(
2356                &agent_ctx,
2357                &chunk_data,
2358                end_of_stream,
2359                chunk_index,
2360                ctx.body_bytes_inspected as usize,
2361                total_size,
2362                &agent_ids,
2363            )
2364            .await
2365        {
2366            Ok(decision) => {
2367                // Track if agent needs more data
2368                ctx.agent_needs_more = decision.needs_more;
2369
2370                // Apply body mutation if present
2371                if let Some(ref mutation) = decision.request_body_mutation {
2372                    if !mutation.is_pass_through() {
2373                        if mutation.is_drop() {
2374                            // Drop the chunk
2375                            *body = None;
2376                            trace!(
2377                                correlation_id = %ctx.trace_id,
2378                                chunk_index = chunk_index,
2379                                "Agent dropped body chunk"
2380                            );
2381                        } else if let Some(ref new_data) = mutation.data {
2382                            // Replace chunk with mutated content
2383                            *body = Some(Bytes::from(new_data.clone()));
2384                            trace!(
2385                                correlation_id = %ctx.trace_id,
2386                                chunk_index = chunk_index,
2387                                original_size = chunk_data.len(),
2388                                new_size = new_data.len(),
2389                                "Agent mutated body chunk"
2390                            );
2391                        }
2392                    }
2393                }
2394
2395                // Check decision (only final if needs_more is false)
2396                if !decision.needs_more && !decision.is_allow() {
2397                    warn!(
2398                        correlation_id = %ctx.trace_id,
2399                        action = ?decision.action,
2400                        "Agent blocked request body"
2401                    );
2402                    self.metrics.record_blocked_request("agent_body_inspection");
2403
2404                    let (status, message) = match &decision.action {
2405                        crate::agents::AgentAction::Block { status, body, .. } => (
2406                            *status,
2407                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
2408                        ),
2409                        _ => (403, "Forbidden".to_string()),
2410                    };
2411
2412                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2413                }
2414
2415                trace!(
2416                    correlation_id = %ctx.trace_id,
2417                    needs_more = decision.needs_more,
2418                    "Agent processed body chunk"
2419                );
2420            }
2421            Err(e) => {
2422                let fail_closed = ctx
2423                    .route_config
2424                    .as_ref()
2425                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2426                    .unwrap_or(false);
2427
2428                if fail_closed {
2429                    error!(
2430                        correlation_id = %ctx.trace_id,
2431                        error = %e,
2432                        "Agent streaming body inspection failed, blocking (fail-closed)"
2433                    );
2434                    return Err(Error::explain(
2435                        ErrorType::HTTPStatus(503),
2436                        "Service unavailable",
2437                    ));
2438                } else {
2439                    warn!(
2440                        correlation_id = %ctx.trace_id,
2441                        error = %e,
2442                        "Agent streaming body inspection failed, allowing (fail-open)"
2443                    );
2444                }
2445            }
2446        }
2447
2448        Ok(())
2449    }
2450
2451    /// Send buffered body to agents (buffer mode).
2452    async fn send_buffered_body_to_agents(
2453        &self,
2454        end_of_stream: bool,
2455        ctx: &mut RequestContext,
2456    ) -> Result<(), Box<Error>> {
2457        debug!(
2458            correlation_id = %ctx.trace_id,
2459            buffer_size = ctx.body_buffer.len(),
2460            end_of_stream = end_of_stream,
2461            agent_count = ctx.body_inspection_agents.len(),
2462            decompression_enabled = ctx.decompression_enabled,
2463            "Sending buffered body to agents for inspection"
2464        );
2465
2466        // Decompress body if enabled and we have a supported encoding
2467        let body_for_inspection = if ctx.decompression_enabled {
2468            if let Some(ref encoding) = ctx.body_content_encoding {
2469                let config = crate::decompression::DecompressionConfig {
2470                    max_ratio: ctx.max_decompression_ratio,
2471                    max_output_bytes: ctx.max_decompression_bytes,
2472                };
2473
2474                match crate::decompression::decompress_body(
2475                    &ctx.body_buffer,
2476                    encoding,
2477                    &config,
2478                ) {
2479                    Ok(result) => {
2480                        ctx.body_was_decompressed = true;
2481                        self.metrics
2482                            .record_decompression_success(encoding, result.ratio);
2483                        debug!(
2484                            correlation_id = %ctx.trace_id,
2485                            encoding = %encoding,
2486                            compressed_size = result.compressed_size,
2487                            decompressed_size = result.decompressed_size,
2488                            ratio = result.ratio,
2489                            "Body decompressed for agent inspection"
2490                        );
2491                        result.data
2492                    }
2493                    Err(e) => {
2494                        // Record failure metric
2495                        let failure_reason = match &e {
2496                            crate::decompression::DecompressionError::RatioExceeded { .. } => {
2497                                "ratio_exceeded"
2498                            }
2499                            crate::decompression::DecompressionError::SizeExceeded { .. } => {
2500                                "size_exceeded"
2501                            }
2502                            crate::decompression::DecompressionError::InvalidData { .. } => {
2503                                "invalid_data"
2504                            }
2505                            crate::decompression::DecompressionError::UnsupportedEncoding { .. } => {
2506                                "unsupported"
2507                            }
2508                            crate::decompression::DecompressionError::IoError(_) => "io_error",
2509                        };
2510                        self.metrics
2511                            .record_decompression_failure(encoding, failure_reason);
2512
2513                        // Decompression failed - decide based on failure mode
2514                        let fail_closed = ctx
2515                            .route_config
2516                            .as_ref()
2517                            .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2518                            .unwrap_or(false);
2519
2520                        if fail_closed {
2521                            error!(
2522                                correlation_id = %ctx.trace_id,
2523                                error = %e,
2524                                encoding = %encoding,
2525                                "Decompression failed, blocking (fail-closed)"
2526                            );
2527                            return Err(Error::explain(
2528                                ErrorType::HTTPStatus(400),
2529                                "Invalid compressed body",
2530                            ));
2531                        } else {
2532                            warn!(
2533                                correlation_id = %ctx.trace_id,
2534                                error = %e,
2535                                encoding = %encoding,
2536                                "Decompression failed, sending compressed body (fail-open)"
2537                            );
2538                            ctx.body_buffer.clone()
2539                        }
2540                    }
2541                }
2542            } else {
2543                ctx.body_buffer.clone()
2544            }
2545        } else {
2546            ctx.body_buffer.clone()
2547        };
2548
2549        let agent_ctx = crate::agents::AgentCallContext {
2550            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
2551            metadata: sentinel_agent_protocol::RequestMetadata {
2552                correlation_id: ctx.trace_id.clone(),
2553                request_id: ctx.trace_id.clone(),
2554                client_ip: ctx.client_ip.clone(),
2555                client_port: 0,
2556                server_name: ctx.host.clone(),
2557                protocol: "HTTP/1.1".to_string(),
2558                tls_version: None,
2559                tls_cipher: None,
2560                route_id: ctx.route_id.clone(),
2561                upstream_id: ctx.upstream.clone(),
2562                timestamp: chrono::Utc::now().to_rfc3339(),
2563                traceparent: ctx.traceparent(),
2564            },
2565            route_id: ctx.route_id.clone(),
2566            upstream_id: ctx.upstream.clone(),
2567            request_body: Some(body_for_inspection.clone()),
2568            response_body: None,
2569        };
2570
2571        let agent_ids = ctx.body_inspection_agents.clone();
2572        match self
2573            .agent_manager
2574            .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
2575            .await
2576        {
2577            Ok(decision) => {
2578                if !decision.is_allow() {
2579                    warn!(
2580                        correlation_id = %ctx.trace_id,
2581                        action = ?decision.action,
2582                        "Agent blocked request body"
2583                    );
2584                    self.metrics.record_blocked_request("agent_body_inspection");
2585
2586                    let (status, message) = match &decision.action {
2587                        crate::agents::AgentAction::Block { status, body, .. } => (
2588                            *status,
2589                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
2590                        ),
2591                        _ => (403, "Forbidden".to_string()),
2592                    };
2593
2594                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2595                }
2596
2597                trace!(
2598                    correlation_id = %ctx.trace_id,
2599                    "Agent allowed request body"
2600                );
2601            }
2602            Err(e) => {
2603                let fail_closed = ctx
2604                    .route_config
2605                    .as_ref()
2606                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2607                    .unwrap_or(false);
2608
2609                if fail_closed {
2610                    error!(
2611                        correlation_id = %ctx.trace_id,
2612                        error = %e,
2613                        "Agent body inspection failed, blocking (fail-closed)"
2614                    );
2615                    return Err(Error::explain(
2616                        ErrorType::HTTPStatus(503),
2617                        "Service unavailable",
2618                    ));
2619                } else {
2620                    warn!(
2621                        correlation_id = %ctx.trace_id,
2622                        error = %e,
2623                        "Agent body inspection failed, allowing (fail-open)"
2624                    );
2625                }
2626            }
2627        }
2628
2629        Ok(())
2630    }
2631}