sentinel_proxy/proxy/
http_trait.rs

1//! ProxyHttp trait implementation for SentinelProxy.
2//!
3//! This module contains the Pingora ProxyHttp trait implementation which defines
4//! the core request/response lifecycle handling.
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14    CacheKey, CacheMeta, ForcedInvalidationKind, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
23use crate::rate_limit::HeaderAccessor;
24use crate::routing::RequestInfo;
25
26use super::context::RequestContext;
27use super::SentinelProxy;
28
29/// Helper type for rate limiting when we don't need header access
30struct NoHeaderAccessor;
31impl HeaderAccessor for NoHeaderAccessor {
32    fn get_header(&self, _name: &str) -> Option<String> {
33        None
34    }
35}
36
37#[async_trait]
38impl ProxyHttp for SentinelProxy {
39    type CTX = RequestContext;
40
41    fn new_ctx(&self) -> Self::CTX {
42        RequestContext::new()
43    }
44
45    fn fail_to_connect(
46        &self,
47        _session: &mut Session,
48        peer: &HttpPeer,
49        ctx: &mut Self::CTX,
50        e: Box<Error>,
51    ) -> Box<Error> {
52        error!(
53            correlation_id = %ctx.trace_id,
54            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
55            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
56            peer_address = %peer.address(),
57            error = %e,
58            "Failed to connect to upstream peer"
59        );
60        // Custom error pages are handled in response_filter
61        e
62    }
63
64    /// Early request filter - runs before upstream selection
65    /// Used to handle builtin routes that don't need an upstream connection
66    async fn early_request_filter(
67        &self,
68        session: &mut Session,
69        ctx: &mut Self::CTX,
70    ) -> Result<(), Box<Error>> {
71        // Extract request info for routing
72        let req_header = session.req_header();
73        let method = req_header.method.as_str();
74        let path = req_header.uri.path();
75        let host = req_header
76            .headers
77            .get("host")
78            .and_then(|h| h.to_str().ok())
79            .unwrap_or("");
80
81        ctx.method = method.to_string();
82        ctx.path = path.to_string();
83        ctx.host = Some(host.to_string());
84
85        // Match route to determine service type
86        let route_match = {
87            let route_matcher = self.route_matcher.read();
88            let request_info = RequestInfo::new(method, path, host);
89            match route_matcher.match_request(&request_info) {
90                Some(m) => m,
91                None => return Ok(()), // No matching route, let upstream_peer handle it
92            }
93        };
94
95        ctx.trace_id = self.get_trace_id(session);
96        ctx.route_id = Some(route_match.route_id.to_string());
97        ctx.route_config = Some(route_match.config.clone());
98
99        // Parse incoming W3C trace context if present
100        if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
101            if let Ok(s) = traceparent.to_str() {
102                ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
103            }
104        }
105
106        // Start OpenTelemetry request span if tracing is enabled
107        if let Some(tracer) = crate::otel::get_tracer() {
108            ctx.otel_span = Some(tracer.start_span(method, path, ctx.trace_context.as_ref()));
109        }
110
111        // Check if this is a builtin handler route
112        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
113            trace!(
114                correlation_id = %ctx.trace_id,
115                route_id = %route_match.route_id,
116                builtin_handler = ?route_match.config.builtin_handler,
117                "Handling builtin route in early_request_filter"
118            );
119
120            // Handle the builtin route directly
121            let handled = self
122                .handle_builtin_route(session, ctx, &route_match)
123                .await?;
124
125            if handled {
126                // Return error to signal that request is complete (Pingora will not continue)
127                return Err(Error::explain(
128                    ErrorType::InternalError,
129                    "Builtin handler complete",
130                ));
131            }
132        }
133
134        Ok(())
135    }
136
137    async fn upstream_peer(
138        &self,
139        session: &mut Session,
140        ctx: &mut Self::CTX,
141    ) -> Result<Box<HttpPeer>, Box<Error>> {
142        // Track active request
143        self.reload_coordinator.inc_requests();
144
145        // Cache global config once per request (avoids repeated Arc clones)
146        if ctx.config.is_none() {
147            ctx.config = Some(self.config_manager.current());
148        }
149
150        // Cache client address for logging if not already set
151        if ctx.client_ip.is_empty() {
152            ctx.client_ip = session
153                .client_addr()
154                .map(|a| a.to_string())
155                .unwrap_or_else(|| "unknown".to_string());
156        }
157
158        let req_header = session.req_header();
159
160        // Cache request info for access logging if not already set
161        if ctx.method.is_empty() {
162            ctx.method = req_header.method.to_string();
163            ctx.path = req_header.uri.path().to_string();
164            ctx.query = req_header.uri.query().map(|q| q.to_string());
165            ctx.host = req_header
166                .headers
167                .get("host")
168                .and_then(|v| v.to_str().ok())
169                .map(|s| s.to_string());
170        }
171        ctx.user_agent = req_header
172            .headers
173            .get("user-agent")
174            .and_then(|v| v.to_str().ok())
175            .map(|s| s.to_string());
176        ctx.referer = req_header
177            .headers
178            .get("referer")
179            .and_then(|v| v.to_str().ok())
180            .map(|s| s.to_string());
181
182        trace!(
183            correlation_id = %ctx.trace_id,
184            client_ip = %ctx.client_ip,
185            "Request received, initializing context"
186        );
187
188        // Use cached route info if already set by early_request_filter
189        let route_match = if let Some(ref route_config) = ctx.route_config {
190            let route_id = ctx.route_id.as_deref().unwrap_or("");
191            crate::routing::RouteMatch {
192                route_id: sentinel_common::RouteId::new(route_id),
193                config: route_config.clone(),
194            }
195        } else {
196            // Match route using sync RwLock (scoped to ensure lock is released before async ops)
197            let (match_result, route_duration) = {
198                let route_matcher = self.route_matcher.read();
199                let host = ctx.host.as_deref().unwrap_or("");
200
201                // Build request info (zero-copy for common case)
202                let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
203
204                // Only build headers HashMap if any route needs header matching
205                if route_matcher.needs_headers() {
206                    request_info = request_info
207                        .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
208                }
209
210                // Only parse query params if any route needs query param matching
211                if route_matcher.needs_query_params() {
212                    request_info =
213                        request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
214                }
215
216                trace!(
217                    correlation_id = %ctx.trace_id,
218                    method = %request_info.method,
219                    path = %request_info.path,
220                    host = %request_info.host,
221                    "Built request info for route matching"
222                );
223
224                let route_start = std::time::Instant::now();
225                let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
226                    warn!(
227                        correlation_id = %ctx.trace_id,
228                        method = %request_info.method,
229                        path = %request_info.path,
230                        host = %request_info.host,
231                        "No matching route found for request"
232                    );
233                    Error::explain(ErrorType::InternalError, "No matching route found")
234                })?;
235                let route_duration = route_start.elapsed();
236                // Lock is dropped here when block ends
237                (route_match, route_duration)
238            };
239
240            ctx.route_id = Some(match_result.route_id.to_string());
241            ctx.route_config = Some(match_result.config.clone());
242
243            // Set trace_id if not already set by early_request_filter
244            if ctx.trace_id.is_empty() {
245                ctx.trace_id = self.get_trace_id(session);
246
247                // Parse incoming W3C trace context if present
248                if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER)
249                {
250                    if let Ok(s) = traceparent.to_str() {
251                        ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
252                    }
253                }
254
255                // Start OpenTelemetry request span if tracing is enabled
256                if let Some(tracer) = crate::otel::get_tracer() {
257                    ctx.otel_span = Some(tracer.start_span(
258                        &ctx.method,
259                        &ctx.path,
260                        ctx.trace_context.as_ref(),
261                    ));
262                }
263            }
264
265            trace!(
266                correlation_id = %ctx.trace_id,
267                route_id = %match_result.route_id,
268                route_duration_us = route_duration.as_micros(),
269                service_type = ?match_result.config.service_type,
270                "Route matched"
271            );
272            match_result
273        };
274
275        // Check if this is a builtin handler route (no upstream needed)
276        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
277            trace!(
278                correlation_id = %ctx.trace_id,
279                route_id = %route_match.route_id,
280                builtin_handler = ?route_match.config.builtin_handler,
281                "Route type is builtin, skipping upstream"
282            );
283            // Mark as builtin route for later processing in request_filter
284            ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
285            // Return error to skip upstream connection for builtin routes
286            return Err(Error::explain(
287                ErrorType::InternalError,
288                "Builtin handler handled in request_filter",
289            ));
290        }
291
292        // Check if this is a static file route
293        if route_match.config.service_type == sentinel_config::ServiceType::Static {
294            trace!(
295                correlation_id = %ctx.trace_id,
296                route_id = %route_match.route_id,
297                "Route type is static, checking for static server"
298            );
299            // Static routes don't need an upstream
300            if self
301                .static_servers
302                .get(route_match.route_id.as_str())
303                .await
304                .is_some()
305            {
306                // Mark this as a static route for later processing
307                ctx.upstream = Some(format!("_static_{}", route_match.route_id));
308                info!(
309                    correlation_id = %ctx.trace_id,
310                    route_id = %route_match.route_id,
311                    path = %ctx.path,
312                    "Serving static file"
313                );
314                // Return error to avoid upstream connection for static routes
315                return Err(Error::explain(
316                    ErrorType::InternalError,
317                    "Static file serving handled in request_filter",
318                ));
319            }
320        }
321
322        // Regular route with upstream
323        if let Some(ref upstream) = route_match.config.upstream {
324            ctx.upstream = Some(upstream.clone());
325            trace!(
326                correlation_id = %ctx.trace_id,
327                route_id = %route_match.route_id,
328                upstream = %upstream,
329                "Upstream configured for route"
330            );
331        } else {
332            error!(
333                correlation_id = %ctx.trace_id,
334                route_id = %route_match.route_id,
335                "Route has no upstream configured"
336            );
337            return Err(Error::explain(
338                ErrorType::InternalError,
339                format!(
340                    "Route '{}' has no upstream configured",
341                    route_match.route_id
342                ),
343            ));
344        }
345
346        debug!(
347            correlation_id = %ctx.trace_id,
348            route_id = %route_match.route_id,
349            upstream = ?ctx.upstream,
350            method = %req_header.method,
351            path = %req_header.uri.path(),
352            host = ctx.host.as_deref().unwrap_or("-"),
353            client_ip = %ctx.client_ip,
354            "Processing request"
355        );
356
357        // Get upstream pool (skip for static routes)
358        if ctx
359            .upstream
360            .as_ref()
361            .is_some_and(|u| u.starts_with("_static_"))
362        {
363            // Static routes are handled in request_filter, should not reach here
364            return Err(Error::explain(
365                ErrorType::InternalError,
366                "Static route should be handled in request_filter",
367            ));
368        }
369
370        let upstream_name = ctx
371            .upstream
372            .as_ref()
373            .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
374
375        trace!(
376            correlation_id = %ctx.trace_id,
377            upstream = %upstream_name,
378            "Looking up upstream pool"
379        );
380
381        let pool = self
382            .upstream_pools
383            .get(upstream_name)
384            .await
385            .ok_or_else(|| {
386                error!(
387                    correlation_id = %ctx.trace_id,
388                    upstream = %upstream_name,
389                    "Upstream pool not found"
390                );
391                Error::explain(
392                    ErrorType::InternalError,
393                    format!("Upstream pool '{}' not found", upstream_name),
394                )
395            })?;
396
397        // Select peer from pool with retries
398        let max_retries = route_match
399            .config
400            .retry_policy
401            .as_ref()
402            .map(|r| r.max_attempts)
403            .unwrap_or(1);
404
405        trace!(
406            correlation_id = %ctx.trace_id,
407            upstream = %upstream_name,
408            max_retries = max_retries,
409            "Starting upstream peer selection"
410        );
411
412        let mut last_error = None;
413        let selection_start = std::time::Instant::now();
414
415        for attempt in 1..=max_retries {
416            ctx.upstream_attempts = attempt;
417
418            trace!(
419                correlation_id = %ctx.trace_id,
420                upstream = %upstream_name,
421                attempt = attempt,
422                max_retries = max_retries,
423                "Attempting to select upstream peer"
424            );
425
426            match pool.select_peer(None).await {
427                Ok(peer) => {
428                    let selection_duration = selection_start.elapsed();
429                    // Store selected peer address for feedback reporting in logging()
430                    let peer_addr = peer.address().to_string();
431                    ctx.selected_upstream_address = Some(peer_addr.clone());
432                    debug!(
433                        correlation_id = %ctx.trace_id,
434                        upstream = %upstream_name,
435                        peer_address = %peer_addr,
436                        attempt = attempt,
437                        selection_duration_us = selection_duration.as_micros(),
438                        "Selected upstream peer"
439                    );
440                    return Ok(Box::new(peer));
441                }
442                Err(e) => {
443                    warn!(
444                        correlation_id = %ctx.trace_id,
445                        upstream = %upstream_name,
446                        attempt = attempt,
447                        max_retries = max_retries,
448                        error = %e,
449                        "Failed to select upstream peer"
450                    );
451                    last_error = Some(e);
452
453                    if attempt < max_retries {
454                        // Exponential backoff (using pingora-timeout for efficiency)
455                        let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
456                        trace!(
457                            correlation_id = %ctx.trace_id,
458                            backoff_ms = backoff.as_millis(),
459                            "Backing off before retry"
460                        );
461                        sleep(backoff).await;
462                    }
463                }
464            }
465        }
466
467        let selection_duration = selection_start.elapsed();
468        error!(
469            correlation_id = %ctx.trace_id,
470            upstream = %upstream_name,
471            attempts = max_retries,
472            selection_duration_ms = selection_duration.as_millis(),
473            last_error = ?last_error,
474            "All upstream selection attempts failed"
475        );
476
477        Err(Error::explain(
478            ErrorType::InternalError,
479            format!("All upstream attempts failed: {:?}", last_error),
480        ))
481    }
482
483    async fn request_filter(
484        &self,
485        session: &mut Session,
486        ctx: &mut Self::CTX,
487    ) -> Result<bool, Box<Error>> {
488        trace!(
489            correlation_id = %ctx.trace_id,
490            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
491            "Starting request filter phase"
492        );
493
494        // Check rate limiting early (before other processing)
495        // Fast path: skip if no rate limiting is configured for this route
496        if let Some(route_id) = ctx.route_id.as_deref() {
497            if self.rate_limit_manager.has_route_limiter(route_id) {
498                let rate_result = self.rate_limit_manager.check(
499                    route_id,
500                    &ctx.client_ip,
501                    &ctx.path,
502                    Option::<&NoHeaderAccessor>::None,
503                );
504
505                // Store rate limit info for response headers (even if allowed)
506                if rate_result.limit > 0 {
507                    ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
508                        limit: rate_result.limit,
509                        remaining: rate_result.remaining,
510                        reset_at: rate_result.reset_at,
511                    });
512                }
513
514                if !rate_result.allowed {
515                    use sentinel_config::RateLimitAction;
516
517                    match rate_result.action {
518                        RateLimitAction::Reject => {
519                            warn!(
520                                correlation_id = %ctx.trace_id,
521                                route_id = route_id,
522                                client_ip = %ctx.client_ip,
523                                limiter = %rate_result.limiter,
524                                limit = rate_result.limit,
525                                remaining = rate_result.remaining,
526                                "Request rate limited"
527                            );
528                            self.metrics.record_blocked_request("rate_limited");
529
530                            // Audit log the rate limit
531                            let audit_entry = AuditLogEntry::rate_limited(
532                                &ctx.trace_id,
533                                &ctx.method,
534                                &ctx.path,
535                                &ctx.client_ip,
536                                &rate_result.limiter,
537                            )
538                            .with_route_id(route_id)
539                            .with_status_code(rate_result.status_code);
540                            self.log_manager.log_audit(&audit_entry);
541
542                            // Send rate limit response with headers
543                            let body = rate_result
544                                .message
545                                .unwrap_or_else(|| "Rate limit exceeded".to_string());
546
547                            // Build response with rate limit headers
548                            let retry_after = rate_result.reset_at.saturating_sub(
549                                std::time::SystemTime::now()
550                                    .duration_since(std::time::UNIX_EPOCH)
551                                    .unwrap_or_default()
552                                    .as_secs(),
553                            );
554                            crate::http_helpers::write_rate_limit_error(
555                                session,
556                                rate_result.status_code,
557                                &body,
558                                rate_result.limit,
559                                rate_result.remaining,
560                                rate_result.reset_at,
561                                retry_after,
562                            )
563                            .await?;
564                            return Ok(true); // Request complete, don't continue
565                        }
566                        RateLimitAction::LogOnly => {
567                            debug!(
568                                correlation_id = %ctx.trace_id,
569                                route_id = route_id,
570                                "Rate limit exceeded (log only mode)"
571                            );
572                            // Continue processing
573                        }
574                        RateLimitAction::Delay => {
575                            // Apply delay if suggested by rate limiter
576                            if let Some(delay_ms) = rate_result.suggested_delay_ms {
577                                // Cap delay at the configured maximum
578                                let actual_delay = delay_ms.min(rate_result.max_delay_ms);
579
580                                if actual_delay > 0 {
581                                    debug!(
582                                        correlation_id = %ctx.trace_id,
583                                        route_id = route_id,
584                                        suggested_delay_ms = delay_ms,
585                                        max_delay_ms = rate_result.max_delay_ms,
586                                        actual_delay_ms = actual_delay,
587                                        "Applying rate limit delay"
588                                    );
589
590                                    tokio::time::sleep(std::time::Duration::from_millis(
591                                        actual_delay,
592                                    ))
593                                    .await;
594                                }
595                            }
596                            // Continue processing after delay
597                        }
598                    }
599                }
600            }
601        }
602
603        // Geo filtering
604        if let Some(route_id) = ctx.route_id.as_deref() {
605            if let Some(ref route_config) = ctx.route_config {
606                for filter_id in &route_config.filters {
607                    if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
608                        // Store country code for response header
609                        ctx.geo_country_code = result.country_code.clone();
610                        ctx.geo_lookup_performed = true;
611
612                        if !result.allowed {
613                            warn!(
614                                correlation_id = %ctx.trace_id,
615                                route_id = route_id,
616                                client_ip = %ctx.client_ip,
617                                country = ?result.country_code,
618                                filter_id = %filter_id,
619                                "Request blocked by geo filter"
620                            );
621                            self.metrics.record_blocked_request("geo_blocked");
622
623                            // Audit log the geo block
624                            let audit_entry = AuditLogEntry::new(
625                                &ctx.trace_id,
626                                AuditEventType::Blocked,
627                                &ctx.method,
628                                &ctx.path,
629                                &ctx.client_ip,
630                            )
631                            .with_route_id(route_id)
632                            .with_status_code(result.status_code)
633                            .with_reason(format!(
634                                "Geo blocked: country={}, filter={}",
635                                result.country_code.as_deref().unwrap_or("unknown"),
636                                filter_id
637                            ));
638                            self.log_manager.log_audit(&audit_entry);
639
640                            // Send geo block response
641                            let body = result
642                                .block_message
643                                .unwrap_or_else(|| "Access denied".to_string());
644
645                            crate::http_helpers::write_error(
646                                session,
647                                result.status_code,
648                                &body,
649                                "text/plain",
650                            )
651                            .await?;
652                            return Ok(true); // Request complete, don't continue
653                        }
654
655                        // Only check first geo filter that matches
656                        break;
657                    }
658                }
659            }
660        }
661
662        // Check for WebSocket upgrade requests
663        let is_websocket_upgrade = session
664            .req_header()
665            .headers
666            .get(http::header::UPGRADE)
667            .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
668            .unwrap_or(false);
669
670        if is_websocket_upgrade {
671            ctx.is_websocket_upgrade = true;
672
673            // Check if route allows WebSocket upgrades
674            if let Some(ref route_config) = ctx.route_config {
675                if !route_config.websocket {
676                    warn!(
677                        correlation_id = %ctx.trace_id,
678                        route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
679                        client_ip = %ctx.client_ip,
680                        "WebSocket upgrade rejected: not enabled for route"
681                    );
682
683                    self.metrics.record_blocked_request("websocket_not_enabled");
684
685                    // Audit log the rejection
686                    let audit_entry = AuditLogEntry::new(
687                        &ctx.trace_id,
688                        AuditEventType::Blocked,
689                        &ctx.method,
690                        &ctx.path,
691                        &ctx.client_ip,
692                    )
693                    .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
694                    .with_action("websocket_rejected")
695                    .with_reason("WebSocket not enabled for route");
696                    self.log_manager.log_audit(&audit_entry);
697
698                    // Send 403 Forbidden response
699                    crate::http_helpers::write_error(
700                        session,
701                        403,
702                        "WebSocket not enabled for this route",
703                        "text/plain",
704                    )
705                    .await?;
706                    return Ok(true); // Request complete, don't continue
707                }
708
709                debug!(
710                    correlation_id = %ctx.trace_id,
711                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
712                    "WebSocket upgrade request allowed"
713                );
714
715                // Check for WebSocket frame inspection
716                if route_config.websocket_inspection {
717                    // Check for compression negotiation - skip inspection if permessage-deflate
718                    let has_compression = session
719                        .req_header()
720                        .headers
721                        .get("Sec-WebSocket-Extensions")
722                        .and_then(|v| v.to_str().ok())
723                        .map(|s| s.contains("permessage-deflate"))
724                        .unwrap_or(false);
725
726                    if has_compression {
727                        debug!(
728                            correlation_id = %ctx.trace_id,
729                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
730                            "WebSocket inspection skipped: permessage-deflate negotiated"
731                        );
732                        ctx.websocket_skip_inspection = true;
733                    } else {
734                        ctx.websocket_inspection_enabled = true;
735
736                        // Get agents that handle WebSocketFrame events
737                        ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
738                            sentinel_agent_protocol::EventType::WebSocketFrame,
739                        );
740
741                        debug!(
742                            correlation_id = %ctx.trace_id,
743                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
744                            agent_count = ctx.websocket_inspection_agents.len(),
745                            "WebSocket frame inspection enabled"
746                        );
747                    }
748                }
749            }
750        }
751
752        // Use cached route config from upstream_peer (avoids duplicate route matching)
753        // Handle static file and builtin routes
754        if let Some(route_config) = ctx.route_config.clone() {
755            if route_config.service_type == sentinel_config::ServiceType::Static {
756                trace!(
757                    correlation_id = %ctx.trace_id,
758                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
759                    "Handling static file route"
760                );
761                // Create a minimal RouteMatch for the handler
762                let route_match = crate::routing::RouteMatch {
763                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
764                    config: route_config.clone(),
765                };
766                return self.handle_static_route(session, ctx, &route_match).await;
767            } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
768                trace!(
769                    correlation_id = %ctx.trace_id,
770                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
771                    builtin_handler = ?route_config.builtin_handler,
772                    "Handling builtin route"
773                );
774                // Create a minimal RouteMatch for the handler
775                let route_match = crate::routing::RouteMatch {
776                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
777                    config: route_config.clone(),
778                };
779                return self.handle_builtin_route(session, ctx, &route_match).await;
780            }
781        }
782
783        // API validation for API routes
784        if let Some(route_id) = ctx.route_id.clone() {
785            if let Some(validator) = self.validators.get(&route_id).await {
786                trace!(
787                    correlation_id = %ctx.trace_id,
788                    route_id = %route_id,
789                    "Running API schema validation"
790                );
791                if let Some(result) = self
792                    .validate_api_request(session, ctx, &route_id, &validator)
793                    .await?
794                {
795                    debug!(
796                        correlation_id = %ctx.trace_id,
797                        route_id = %route_id,
798                        validation_passed = result,
799                        "API validation complete"
800                    );
801                    return Ok(result);
802                }
803            }
804        }
805
806        // Get client address before mutable borrow
807        let client_addr = session
808            .client_addr()
809            .map(|a| format!("{}", a))
810            .unwrap_or_else(|| "unknown".to_string());
811        let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
812
813        let req_header = session.req_header_mut();
814
815        // Add correlation ID header
816        req_header
817            .insert_header("X-Correlation-Id", &ctx.trace_id)
818            .ok();
819        req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
820
821        // Use cached config (set in upstream_peer, or fetch now if needed)
822        let config = ctx
823            .config
824            .get_or_insert_with(|| self.config_manager.current());
825
826        // Enforce header limits (fast path: skip if limits are very high)
827        const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; // 1MB = effectively unlimited
828
829        // Header count check - O(1)
830        let header_count = req_header.headers.len();
831        if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
832            && header_count > config.limits.max_header_count
833        {
834            warn!(
835                correlation_id = %ctx.trace_id,
836                header_count = header_count,
837                limit = config.limits.max_header_count,
838                "Request blocked: exceeds header count limit"
839            );
840
841            self.metrics.record_blocked_request("header_count_exceeded");
842            return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
843        }
844
845        // Header size check - O(n), skip if limit is very high
846        if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
847            let total_header_size: usize = req_header
848                .headers
849                .iter()
850                .map(|(k, v)| k.as_str().len() + v.len())
851                .sum();
852
853            if total_header_size > config.limits.max_header_size_bytes {
854                warn!(
855                    correlation_id = %ctx.trace_id,
856                    header_size = total_header_size,
857                    limit = config.limits.max_header_size_bytes,
858                    "Request blocked: exceeds header size limit"
859                );
860
861                self.metrics.record_blocked_request("header_size_exceeded");
862                return Err(Error::explain(
863                    ErrorType::InternalError,
864                    "Headers too large",
865                ));
866            }
867        }
868
869        // Process through external agents
870        trace!(
871            correlation_id = %ctx.trace_id,
872            "Processing request through agents"
873        );
874        if let Err(e) = self
875            .process_agents(session, ctx, &client_addr, client_port)
876            .await
877        {
878            // Check if this is an HTTPStatus error (e.g., agent block or fail-closed)
879            // In that case, we need to send a proper HTTP response instead of just closing the connection
880            if let ErrorType::HTTPStatus(status) = e.etype() {
881                // Extract the message from the error (the context part after "HTTPStatus context:")
882                let error_msg = e.to_string();
883                let body = error_msg
884                    .split("context:")
885                    .nth(1)
886                    .map(|s| s.trim())
887                    .unwrap_or("Request blocked");
888                debug!(
889                    correlation_id = %ctx.trace_id,
890                    status = status,
891                    body = %body,
892                    "Sending HTTP error response for agent block"
893                );
894                crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
895                return Ok(true); // Request complete, don't continue to upstream
896            }
897            // For other errors, propagate them
898            return Err(e);
899        }
900
901        trace!(
902            correlation_id = %ctx.trace_id,
903            "Request filter phase complete, forwarding to upstream"
904        );
905
906        Ok(false) // Continue processing
907    }
908
909    /// Process incoming request body chunks.
910    /// Used for body size enforcement and WAF/agent inspection.
911    ///
912    /// Supports two modes:
913    /// - **Buffer mode** (default): Buffer chunks until end of stream or limit, then send to agents
914    /// - **Stream mode**: Send each chunk immediately to agents as it arrives
915    async fn request_body_filter(
916        &self,
917        _session: &mut Session,
918        body: &mut Option<Bytes>,
919        end_of_stream: bool,
920        ctx: &mut Self::CTX,
921    ) -> Result<(), Box<Error>> {
922        use sentinel_config::BodyStreamingMode;
923
924        // Handle WebSocket frame inspection (client -> server)
925        if ctx.is_websocket_upgrade {
926            if let Some(ref handler) = ctx.websocket_handler {
927                let result = handler.process_client_data(body.take()).await;
928                match result {
929                    crate::websocket::ProcessResult::Forward(data) => {
930                        *body = data;
931                    }
932                    crate::websocket::ProcessResult::Close(reason) => {
933                        warn!(
934                            correlation_id = %ctx.trace_id,
935                            code = reason.code,
936                            reason = %reason.reason,
937                            "WebSocket connection closed by agent (client->server)"
938                        );
939                        // Return an error to close the connection
940                        return Err(Error::explain(
941                            ErrorType::InternalError,
942                            format!("WebSocket closed: {} {}", reason.code, reason.reason),
943                        ));
944                    }
945                }
946            }
947            // Skip normal body processing for WebSocket
948            return Ok(());
949        }
950
951        // Track request body size
952        let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
953        if chunk_len > 0 {
954            ctx.request_body_bytes += chunk_len as u64;
955
956            trace!(
957                correlation_id = %ctx.trace_id,
958                chunk_size = chunk_len,
959                total_body_bytes = ctx.request_body_bytes,
960                end_of_stream = end_of_stream,
961                streaming_mode = ?ctx.request_body_streaming_mode,
962                "Processing request body chunk"
963            );
964
965            // Check body size limit (use cached config)
966            let config = ctx
967                .config
968                .get_or_insert_with(|| self.config_manager.current());
969            if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
970                warn!(
971                    correlation_id = %ctx.trace_id,
972                    body_bytes = ctx.request_body_bytes,
973                    limit = config.limits.max_body_size_bytes,
974                    "Request body size limit exceeded"
975                );
976                self.metrics.record_blocked_request("body_size_exceeded");
977                return Err(Error::explain(
978                    ErrorType::InternalError,
979                    "Request body too large",
980                ));
981            }
982        }
983
984        // Body inspection for agents (WAF, etc.)
985        if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
986            let config = ctx
987                .config
988                .get_or_insert_with(|| self.config_manager.current());
989            let max_inspection_bytes = config
990                .waf
991                .as_ref()
992                .map(|w| w.body_inspection.max_inspection_bytes as u64)
993                .unwrap_or(1024 * 1024);
994
995            match ctx.request_body_streaming_mode {
996                BodyStreamingMode::Stream => {
997                    // Stream mode: send each chunk immediately
998                    if body.is_some() {
999                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1000                            .await?;
1001                    } else if end_of_stream && ctx.agent_needs_more {
1002                        // Send final empty chunk to signal end
1003                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1004                            .await?;
1005                    }
1006                }
1007                BodyStreamingMode::Hybrid { buffer_threshold } => {
1008                    // Hybrid mode: buffer up to threshold, then stream
1009                    if ctx.body_bytes_inspected < buffer_threshold as u64 {
1010                        // Still in buffering phase
1011                        if let Some(ref chunk) = body {
1012                            let bytes_to_buffer = std::cmp::min(
1013                                chunk.len(),
1014                                (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
1015                            );
1016                            ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
1017                            ctx.body_bytes_inspected += bytes_to_buffer as u64;
1018
1019                            // If we've reached threshold or end of stream, switch to streaming
1020                            if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
1021                            {
1022                                // Send buffered content first
1023                                self.send_buffered_body_to_agents(
1024                                    end_of_stream && chunk.len() == bytes_to_buffer,
1025                                    ctx,
1026                                )
1027                                .await?;
1028                                ctx.body_buffer.clear();
1029
1030                                // If there's remaining data in this chunk, stream it
1031                                if bytes_to_buffer < chunk.len() {
1032                                    let remaining = chunk.slice(bytes_to_buffer..);
1033                                    let mut remaining_body = Some(remaining);
1034                                    self.process_body_chunk_streaming(
1035                                        &mut remaining_body,
1036                                        end_of_stream,
1037                                        ctx,
1038                                    )
1039                                    .await?;
1040                                }
1041                            }
1042                        }
1043                    } else {
1044                        // Past threshold, stream directly
1045                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1046                            .await?;
1047                    }
1048                }
1049                BodyStreamingMode::Buffer => {
1050                    // Buffer mode: collect chunks until ready to send
1051                    if let Some(ref chunk) = body {
1052                        if ctx.body_bytes_inspected < max_inspection_bytes {
1053                            let bytes_to_inspect = std::cmp::min(
1054                                chunk.len() as u64,
1055                                max_inspection_bytes - ctx.body_bytes_inspected,
1056                            ) as usize;
1057
1058                            ctx.body_buffer
1059                                .extend_from_slice(&chunk[..bytes_to_inspect]);
1060                            ctx.body_bytes_inspected += bytes_to_inspect as u64;
1061
1062                            trace!(
1063                                correlation_id = %ctx.trace_id,
1064                                bytes_inspected = ctx.body_bytes_inspected,
1065                                max_inspection_bytes = max_inspection_bytes,
1066                                buffer_size = ctx.body_buffer.len(),
1067                                "Buffering body for agent inspection"
1068                            );
1069                        }
1070                    }
1071
1072                    // Send when complete or limit reached
1073                    let should_send =
1074                        end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1075                    if should_send && !ctx.body_buffer.is_empty() {
1076                        self.send_buffered_body_to_agents(end_of_stream, ctx)
1077                            .await?;
1078                        ctx.body_buffer.clear();
1079                    }
1080                }
1081            }
1082        }
1083
1084        if end_of_stream {
1085            trace!(
1086                correlation_id = %ctx.trace_id,
1087                total_body_bytes = ctx.request_body_bytes,
1088                bytes_inspected = ctx.body_bytes_inspected,
1089                "Request body complete"
1090            );
1091        }
1092
1093        Ok(())
1094    }
1095
1096    async fn response_filter(
1097        &self,
1098        _session: &mut Session,
1099        upstream_response: &mut ResponseHeader,
1100        ctx: &mut Self::CTX,
1101    ) -> Result<(), Box<Error>> {
1102        let status = upstream_response.status.as_u16();
1103        let duration = ctx.elapsed();
1104
1105        trace!(
1106            correlation_id = %ctx.trace_id,
1107            status = status,
1108            "Starting response filter phase"
1109        );
1110
1111        // Handle WebSocket 101 Switching Protocols
1112        if status == 101 && ctx.is_websocket_upgrade {
1113            if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1114                // Create WebSocket inspector and handler with metrics
1115                let inspector = crate::websocket::WebSocketInspector::with_metrics(
1116                    self.agent_manager.clone(),
1117                    ctx.route_id
1118                        .clone()
1119                        .unwrap_or_else(|| "unknown".to_string()),
1120                    ctx.trace_id.clone(),
1121                    ctx.client_ip.clone(),
1122                    100, // 100ms timeout per frame inspection
1123                    Some(self.metrics.clone()),
1124                );
1125
1126                let handler = crate::websocket::WebSocketHandler::new(
1127                    std::sync::Arc::new(inspector),
1128                    1024 * 1024, // 1MB max frame size
1129                );
1130
1131                ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1132
1133                info!(
1134                    correlation_id = %ctx.trace_id,
1135                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1136                    agent_count = ctx.websocket_inspection_agents.len(),
1137                    "WebSocket upgrade successful, frame inspection enabled"
1138                );
1139            } else if ctx.websocket_skip_inspection {
1140                debug!(
1141                    correlation_id = %ctx.trace_id,
1142                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1143                    "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1144                );
1145            } else {
1146                debug!(
1147                    correlation_id = %ctx.trace_id,
1148                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1149                    "WebSocket upgrade successful"
1150                );
1151            }
1152        }
1153
1154        // Apply security headers
1155        trace!(
1156            correlation_id = %ctx.trace_id,
1157            "Applying security headers"
1158        );
1159        self.apply_security_headers(upstream_response).ok();
1160
1161        // Add correlation ID to response
1162        upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1163
1164        // Add rate limit headers if rate limiting was applied
1165        if let Some(ref rate_info) = ctx.rate_limit_info {
1166            upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1167            upstream_response
1168                .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1169            upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1170        }
1171
1172        // Add GeoIP country header if geo lookup was performed
1173        if let Some(ref country_code) = ctx.geo_country_code {
1174            upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1175        }
1176
1177        // Generate custom error pages for error responses
1178        if status >= 400 {
1179            trace!(
1180                correlation_id = %ctx.trace_id,
1181                status = status,
1182                "Handling error response"
1183            );
1184            self.handle_error_response(upstream_response, ctx).await?;
1185        }
1186
1187        // Record metrics
1188        self.metrics.record_request(
1189            ctx.route_id.as_deref().unwrap_or("unknown"),
1190            &ctx.method,
1191            status,
1192            duration,
1193        );
1194
1195        // Record OpenTelemetry span status
1196        if let Some(ref mut span) = ctx.otel_span {
1197            span.set_status(status);
1198            if let Some(ref upstream) = ctx.upstream {
1199                span.set_upstream(upstream, "");
1200            }
1201            if status >= 500 {
1202                span.record_error(&format!("HTTP {}", status));
1203            }
1204        }
1205
1206        // Record passive health check
1207        if let Some(ref upstream) = ctx.upstream {
1208            let success = status < 500;
1209
1210            trace!(
1211                correlation_id = %ctx.trace_id,
1212                upstream = %upstream,
1213                success = success,
1214                status = status,
1215                "Recording passive health check result"
1216            );
1217
1218            let error_msg = if !success {
1219                Some(format!("HTTP {}", status))
1220            } else {
1221                None
1222            };
1223            self.passive_health
1224                .record_outcome(upstream, success, error_msg.as_deref())
1225                .await;
1226
1227            // Report to upstream pool
1228            if let Some(pool) = self.upstream_pools.get(upstream).await {
1229                pool.report_result(upstream, success).await;
1230            }
1231
1232            if !success {
1233                warn!(
1234                    correlation_id = %ctx.trace_id,
1235                    upstream = %upstream,
1236                    status = status,
1237                    "Upstream returned error status"
1238                );
1239            }
1240        }
1241
1242        // Final request completion log
1243        if status >= 500 {
1244            error!(
1245                correlation_id = %ctx.trace_id,
1246                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1247                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1248                method = %ctx.method,
1249                path = %ctx.path,
1250                status = status,
1251                duration_ms = duration.as_millis(),
1252                attempts = ctx.upstream_attempts,
1253                "Request completed with server error"
1254            );
1255        } else if status >= 400 {
1256            warn!(
1257                correlation_id = %ctx.trace_id,
1258                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1259                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1260                method = %ctx.method,
1261                path = %ctx.path,
1262                status = status,
1263                duration_ms = duration.as_millis(),
1264                "Request completed with client error"
1265            );
1266        } else {
1267            debug!(
1268                correlation_id = %ctx.trace_id,
1269                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1270                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1271                method = %ctx.method,
1272                path = %ctx.path,
1273                status = status,
1274                duration_ms = duration.as_millis(),
1275                attempts = ctx.upstream_attempts,
1276                "Request completed"
1277            );
1278        }
1279
1280        Ok(())
1281    }
1282
1283    /// Modify the request before sending to upstream.
1284    /// Used for header modifications, adding authentication, etc.
1285    async fn upstream_request_filter(
1286        &self,
1287        _session: &mut Session,
1288        upstream_request: &mut pingora::http::RequestHeader,
1289        ctx: &mut Self::CTX,
1290    ) -> Result<()>
1291    where
1292        Self::CTX: Send + Sync,
1293    {
1294        trace!(
1295            correlation_id = %ctx.trace_id,
1296            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1297            "Applying upstream request modifications"
1298        );
1299
1300        // Add trace ID header for upstream correlation
1301        upstream_request
1302            .insert_header("X-Trace-Id", &ctx.trace_id)
1303            .ok();
1304
1305        // Add W3C traceparent header for distributed tracing
1306        if let Some(ref span) = ctx.otel_span {
1307            let sampled = ctx.trace_context.as_ref().map(|c| c.sampled).unwrap_or(true);
1308            let traceparent =
1309                crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled);
1310            upstream_request
1311                .insert_header(crate::otel::TRACEPARENT_HEADER, &traceparent)
1312                .ok();
1313        }
1314
1315        // Add request metadata headers
1316        upstream_request
1317            .insert_header("X-Forwarded-By", "Sentinel")
1318            .ok();
1319
1320        // Apply route-specific request header modifications
1321        // Clone the modifications to avoid lifetime issues with the header API
1322        if let Some(ref route_config) = ctx.route_config {
1323            let mods = route_config.policies.request_headers.clone();
1324
1325            // Set headers (overwrite existing)
1326            for (name, value) in mods.set {
1327                upstream_request.insert_header(name, value).ok();
1328            }
1329
1330            // Add headers (append)
1331            for (name, value) in mods.add {
1332                upstream_request.append_header(name, value).ok();
1333            }
1334
1335            // Remove headers
1336            for name in &mods.remove {
1337                upstream_request.remove_header(name);
1338            }
1339
1340            trace!(
1341                correlation_id = %ctx.trace_id,
1342                "Applied request header modifications"
1343            );
1344        }
1345
1346        // Remove sensitive headers that shouldn't go to upstream
1347        upstream_request.remove_header("X-Internal-Token");
1348        upstream_request.remove_header("Authorization-Internal");
1349
1350        Ok(())
1351    }
1352
1353    /// Process response body chunks from upstream.
1354    /// Used for response size tracking and WAF inspection.
1355    ///
1356    /// Note: Response body inspection is currently buffered only (streaming mode not supported
1357    /// for responses due to Pingora's synchronous filter design).
1358    fn response_body_filter(
1359        &self,
1360        _session: &mut Session,
1361        body: &mut Option<Bytes>,
1362        end_of_stream: bool,
1363        ctx: &mut Self::CTX,
1364    ) -> Result<Option<Duration>, Box<Error>> {
1365        // Handle WebSocket frame inspection (server -> client)
1366        // Note: This filter is synchronous, so we use block_in_place for async agent calls
1367        if ctx.is_websocket_upgrade {
1368            if let Some(ref handler) = ctx.websocket_handler {
1369                let handler = handler.clone();
1370                let data = body.take();
1371
1372                // Use block_in_place to run async handler from sync context
1373                // This is safe because Pingora uses a multi-threaded tokio runtime
1374                let result = tokio::task::block_in_place(|| {
1375                    tokio::runtime::Handle::current()
1376                        .block_on(async { handler.process_server_data(data).await })
1377                });
1378
1379                match result {
1380                    crate::websocket::ProcessResult::Forward(data) => {
1381                        *body = data;
1382                    }
1383                    crate::websocket::ProcessResult::Close(reason) => {
1384                        warn!(
1385                            correlation_id = %ctx.trace_id,
1386                            code = reason.code,
1387                            reason = %reason.reason,
1388                            "WebSocket connection closed by agent (server->client)"
1389                        );
1390                        // For sync filter, we can't return an error that closes the connection
1391                        // Instead, inject a close frame
1392                        let close_frame =
1393                            crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
1394                        let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
1395                        if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
1396                            *body = Some(Bytes::from(encoded));
1397                        }
1398                    }
1399                }
1400            }
1401            // Skip normal body processing for WebSocket
1402            return Ok(None);
1403        }
1404
1405        // Track response body size
1406        if let Some(ref chunk) = body {
1407            ctx.response_bytes += chunk.len() as u64;
1408
1409            trace!(
1410                correlation_id = %ctx.trace_id,
1411                chunk_size = chunk.len(),
1412                total_response_bytes = ctx.response_bytes,
1413                end_of_stream = end_of_stream,
1414                "Processing response body chunk"
1415            );
1416
1417            // Response body inspection (buffered mode only)
1418            // Note: Streaming mode for response bodies is not currently supported
1419            // due to Pingora's synchronous response_body_filter design
1420            if ctx.response_body_inspection_enabled
1421                && !ctx.response_body_inspection_agents.is_empty()
1422            {
1423                let config = ctx
1424                    .config
1425                    .get_or_insert_with(|| self.config_manager.current());
1426                let max_inspection_bytes = config
1427                    .waf
1428                    .as_ref()
1429                    .map(|w| w.body_inspection.max_inspection_bytes as u64)
1430                    .unwrap_or(1024 * 1024);
1431
1432                if ctx.response_body_bytes_inspected < max_inspection_bytes {
1433                    let bytes_to_inspect = std::cmp::min(
1434                        chunk.len() as u64,
1435                        max_inspection_bytes - ctx.response_body_bytes_inspected,
1436                    ) as usize;
1437
1438                    // Buffer for later processing (during logging phase)
1439                    // Response body inspection happens asynchronously and results
1440                    // are logged rather than blocking the response
1441                    ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
1442                    ctx.response_body_chunk_index += 1;
1443
1444                    trace!(
1445                        correlation_id = %ctx.trace_id,
1446                        bytes_inspected = ctx.response_body_bytes_inspected,
1447                        max_inspection_bytes = max_inspection_bytes,
1448                        chunk_index = ctx.response_body_chunk_index,
1449                        "Tracking response body for inspection"
1450                    );
1451                }
1452            }
1453        }
1454
1455        if end_of_stream {
1456            trace!(
1457                correlation_id = %ctx.trace_id,
1458                total_response_bytes = ctx.response_bytes,
1459                response_bytes_inspected = ctx.response_body_bytes_inspected,
1460                "Response body complete"
1461            );
1462        }
1463
1464        // Return None to indicate no delay needed
1465        Ok(None)
1466    }
1467
1468    /// Called when a connection to upstream is established or reused.
1469    /// Logs connection reuse statistics for observability.
1470    async fn connected_to_upstream(
1471        &self,
1472        _session: &mut Session,
1473        reused: bool,
1474        peer: &HttpPeer,
1475        #[cfg(unix)] _fd: RawFd,
1476        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
1477        digest: Option<&Digest>,
1478        ctx: &mut Self::CTX,
1479    ) -> Result<(), Box<Error>> {
1480        // Track connection reuse for metrics
1481        ctx.connection_reused = reused;
1482
1483        // Log connection establishment/reuse
1484        if reused {
1485            trace!(
1486                correlation_id = %ctx.trace_id,
1487                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1488                peer_address = %peer.address(),
1489                "Reusing existing upstream connection"
1490            );
1491        } else {
1492            debug!(
1493                correlation_id = %ctx.trace_id,
1494                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1495                peer_address = %peer.address(),
1496                ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
1497                "Established new upstream connection"
1498            );
1499        }
1500
1501        Ok(())
1502    }
1503
1504    // =========================================================================
1505    // HTTP Caching - Pingora Cache Integration
1506    // =========================================================================
1507
1508    /// Decide if the request should use caching.
1509    ///
1510    /// This method is called early in the request lifecycle to determine if
1511    /// the response should be served from cache or if the response should
1512    /// be cached.
1513    fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
1514        // Check if route has caching enabled
1515        let route_id = match ctx.route_id.as_deref() {
1516            Some(id) => id,
1517            None => {
1518                trace!(
1519                    correlation_id = %ctx.trace_id,
1520                    "Cache filter: no route ID, skipping cache"
1521                );
1522                return Ok(());
1523            }
1524        };
1525
1526        // Check if caching is enabled for this route
1527        if !self.cache_manager.is_enabled(route_id) {
1528            trace!(
1529                correlation_id = %ctx.trace_id,
1530                route_id = %route_id,
1531                "Cache disabled for route"
1532            );
1533            return Ok(());
1534        }
1535
1536        // Check if method is cacheable (typically GET/HEAD)
1537        if !self
1538            .cache_manager
1539            .is_method_cacheable(route_id, &ctx.method)
1540        {
1541            trace!(
1542                correlation_id = %ctx.trace_id,
1543                route_id = %route_id,
1544                method = %ctx.method,
1545                "Method not cacheable"
1546            );
1547            return Ok(());
1548        }
1549
1550        // Enable caching for this request using Pingora's cache infrastructure
1551        debug!(
1552            correlation_id = %ctx.trace_id,
1553            route_id = %route_id,
1554            method = %ctx.method,
1555            path = %ctx.path,
1556            "Enabling HTTP caching for request"
1557        );
1558
1559        // Get static references to cache infrastructure
1560        let storage = get_cache_storage();
1561        let eviction = get_cache_eviction();
1562        let cache_lock = get_cache_lock();
1563
1564        // Enable the cache with storage, eviction, and lock
1565        session.cache.enable(
1566            storage,
1567            Some(eviction),
1568            None, // predictor - optional
1569            Some(cache_lock),
1570            None, // option overrides
1571        );
1572
1573        // Mark request as cache-eligible in context
1574        ctx.cache_eligible = true;
1575
1576        trace!(
1577            correlation_id = %ctx.trace_id,
1578            route_id = %route_id,
1579            cache_enabled = session.cache.enabled(),
1580            "Cache enabled for request"
1581        );
1582
1583        Ok(())
1584    }
1585
1586    /// Generate the cache key for this request.
1587    ///
1588    /// The cache key uniquely identifies the cached response. It typically
1589    /// includes the method, host, path, and potentially query parameters.
1590    fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
1591        let req_header = session.req_header();
1592        let method = req_header.method.as_str();
1593        let path = req_header.uri.path();
1594        let host = ctx.host.as_deref().unwrap_or("unknown");
1595        let query = req_header.uri.query();
1596
1597        // Generate cache key using our cache manager
1598        let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1599
1600        trace!(
1601            correlation_id = %ctx.trace_id,
1602            cache_key = %key_string,
1603            "Generated cache key"
1604        );
1605
1606        // Use Pingora's default cache key generator which handles
1607        // proper hashing and internal format
1608        Ok(CacheKey::default(req_header))
1609    }
1610
1611    /// Called when a cache miss occurs.
1612    ///
1613    /// This is called when the cache lookup found no matching entry.
1614    /// We can use this to log and track cache misses.
1615    fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
1616        // Let Pingora handle the cache miss
1617        session.cache.cache_miss();
1618
1619        // Track statistics
1620        if let Some(route_id) = ctx.route_id.as_deref() {
1621            self.cache_manager.stats().record_miss();
1622
1623            trace!(
1624                correlation_id = %ctx.trace_id,
1625                route_id = %route_id,
1626                path = %ctx.path,
1627                "Cache miss"
1628            );
1629        }
1630    }
1631
1632    /// Called after a successful cache lookup.
1633    ///
1634    /// This filter allows inspecting the cached response before serving it.
1635    /// Returns `None` to serve the cached response, or a `ForcedInvalidationKind`
1636    /// to invalidate and refetch.
1637    async fn cache_hit_filter(
1638        &self,
1639        session: &mut Session,
1640        meta: &CacheMeta,
1641        _hit_handler: &mut HitHandler,
1642        is_fresh: bool,
1643        ctx: &mut Self::CTX,
1644    ) -> Result<Option<ForcedInvalidationKind>>
1645    where
1646        Self::CTX: Send + Sync,
1647    {
1648        // Check if this cache entry should be invalidated due to a purge request
1649        let req_header = session.req_header();
1650        let method = req_header.method.as_str();
1651        let path = req_header.uri.path();
1652        let host = req_header.uri.host().unwrap_or("localhost");
1653        let query = req_header.uri.query();
1654
1655        // Generate the cache key for this request
1656        let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1657
1658        // Check if this key should be invalidated
1659        if self.cache_manager.should_invalidate(&cache_key) {
1660            info!(
1661                correlation_id = %ctx.trace_id,
1662                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1663                cache_key = %cache_key,
1664                "Cache entry invalidated by purge request"
1665            );
1666            // Force expiration so the entry is refetched from upstream
1667            return Ok(Some(ForcedInvalidationKind::ForceExpired));
1668        }
1669
1670        // Track cache hit statistics
1671        if is_fresh {
1672            self.cache_manager.stats().record_hit();
1673
1674            debug!(
1675                correlation_id = %ctx.trace_id,
1676                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1677                is_fresh = is_fresh,
1678                "Cache hit (fresh)"
1679            );
1680        } else {
1681            trace!(
1682                correlation_id = %ctx.trace_id,
1683                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1684                is_fresh = is_fresh,
1685                "Cache hit (stale)"
1686            );
1687        }
1688
1689        // Serve the cached response without invalidation
1690        Ok(None)
1691    }
1692
1693    /// Decide if the response should be cached.
1694    ///
1695    /// Called after receiving the response from upstream to determine
1696    /// if it should be stored in the cache.
1697    fn response_cache_filter(
1698        &self,
1699        _session: &Session,
1700        resp: &ResponseHeader,
1701        ctx: &mut Self::CTX,
1702    ) -> Result<RespCacheable> {
1703        let route_id = match ctx.route_id.as_deref() {
1704            Some(id) => id,
1705            None => {
1706                return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1707                    "no_route",
1708                )));
1709            }
1710        };
1711
1712        // Check if caching is enabled for this route
1713        if !self.cache_manager.is_enabled(route_id) {
1714            return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1715                "disabled",
1716            )));
1717        }
1718
1719        let status = resp.status.as_u16();
1720
1721        // Check if status code is cacheable
1722        if !self.cache_manager.is_status_cacheable(route_id, status) {
1723            trace!(
1724                correlation_id = %ctx.trace_id,
1725                route_id = %route_id,
1726                status = status,
1727                "Status code not cacheable"
1728            );
1729            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1730        }
1731
1732        // Check Cache-Control header for no-store, no-cache, private
1733        if let Some(cache_control) = resp.headers.get("cache-control") {
1734            if let Ok(cc_str) = cache_control.to_str() {
1735                if crate::cache::CacheManager::is_no_cache(cc_str) {
1736                    trace!(
1737                        correlation_id = %ctx.trace_id,
1738                        route_id = %route_id,
1739                        cache_control = %cc_str,
1740                        "Response has no-cache directive"
1741                    );
1742                    return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1743                }
1744            }
1745        }
1746
1747        // Calculate TTL from Cache-Control or use default
1748        let cache_control = resp
1749            .headers
1750            .get("cache-control")
1751            .and_then(|v| v.to_str().ok());
1752        let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
1753
1754        if ttl.is_zero() {
1755            trace!(
1756                correlation_id = %ctx.trace_id,
1757                route_id = %route_id,
1758                "TTL is zero, not caching"
1759            );
1760            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1761        }
1762
1763        // Get route cache config for stale settings
1764        let config = self
1765            .cache_manager
1766            .get_route_config(route_id)
1767            .unwrap_or_default();
1768
1769        // Create timestamps for cache metadata
1770        let now = std::time::SystemTime::now();
1771        let fresh_until = now + ttl;
1772
1773        // Clone the response header for storage
1774        let header = resp.clone();
1775
1776        // Create CacheMeta with proper timestamps and TTLs
1777        let cache_meta = CacheMeta::new(
1778            fresh_until,
1779            now,
1780            config.stale_while_revalidate_secs as u32,
1781            config.stale_if_error_secs as u32,
1782            header,
1783        );
1784
1785        // Track the cache store
1786        self.cache_manager.stats().record_store();
1787
1788        debug!(
1789            correlation_id = %ctx.trace_id,
1790            route_id = %route_id,
1791            status = status,
1792            ttl_secs = ttl.as_secs(),
1793            stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1794            stale_if_error_secs = config.stale_if_error_secs,
1795            "Caching response"
1796        );
1797
1798        Ok(RespCacheable::Cacheable(cache_meta))
1799    }
1800
1801    /// Decide whether to serve stale content on error or during revalidation.
1802    ///
1803    /// This implements stale-while-revalidate and stale-if-error semantics.
1804    fn should_serve_stale(
1805        &self,
1806        _session: &mut Session,
1807        ctx: &mut Self::CTX,
1808        error: Option<&Error>,
1809    ) -> bool {
1810        let route_id = match ctx.route_id.as_deref() {
1811            Some(id) => id,
1812            None => return false,
1813        };
1814
1815        // Get route cache config for stale settings
1816        let config = match self.cache_manager.get_route_config(route_id) {
1817            Some(c) => c,
1818            None => return false,
1819        };
1820
1821        // If there's an upstream error, use stale-if-error
1822        if let Some(e) = error {
1823            // Only serve stale for upstream errors
1824            if e.esource() == &pingora::ErrorSource::Upstream {
1825                debug!(
1826                    correlation_id = %ctx.trace_id,
1827                    route_id = %route_id,
1828                    error = %e,
1829                    stale_if_error_secs = config.stale_if_error_secs,
1830                    "Considering stale-if-error"
1831                );
1832                return config.stale_if_error_secs > 0;
1833            }
1834        }
1835
1836        // During stale-while-revalidate (error is None)
1837        if error.is_none() && config.stale_while_revalidate_secs > 0 {
1838            trace!(
1839                correlation_id = %ctx.trace_id,
1840                route_id = %route_id,
1841                stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1842                "Allowing stale-while-revalidate"
1843            );
1844            return true;
1845        }
1846
1847        false
1848    }
1849
1850    /// Handle Range header for byte-range requests (streaming support).
1851    ///
1852    /// This method is called when a Range header is present in the request.
1853    /// It allows proper handling of:
1854    /// - Video streaming (HTML5 video seeking)
1855    /// - Large file downloads with resume support
1856    /// - Partial content delivery
1857    ///
1858    /// Uses Pingora's built-in range handling with route-specific logging.
1859    fn range_header_filter(
1860        &self,
1861        session: &mut Session,
1862        response: &mut ResponseHeader,
1863        ctx: &mut Self::CTX,
1864    ) -> pingora_proxy::RangeType
1865    where
1866        Self::CTX: Send + Sync,
1867    {
1868        // Check if route supports range requests
1869        let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
1870            // Static file routes and media routes should support range requests
1871            matches!(
1872                config.service_type,
1873                sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
1874            )
1875        });
1876
1877        if !supports_range {
1878            trace!(
1879                correlation_id = %ctx.trace_id,
1880                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1881                "Range request not supported for this route type"
1882            );
1883            return pingora_proxy::RangeType::None;
1884        }
1885
1886        // Use Pingora's built-in range header parsing and handling
1887        let range_type = pingora_proxy::range_header_filter(session.req_header(), response);
1888
1889        match &range_type {
1890            pingora_proxy::RangeType::None => {
1891                trace!(
1892                    correlation_id = %ctx.trace_id,
1893                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1894                    "No range request or not applicable"
1895                );
1896            }
1897            pingora_proxy::RangeType::Single(range) => {
1898                trace!(
1899                    correlation_id = %ctx.trace_id,
1900                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1901                    range_start = range.start,
1902                    range_end = range.end,
1903                    "Processing single-range request"
1904                );
1905            }
1906            pingora_proxy::RangeType::Multi(multi) => {
1907                trace!(
1908                    correlation_id = %ctx.trace_id,
1909                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1910                    range_count = multi.ranges.len(),
1911                    "Processing multi-range request"
1912                );
1913            }
1914            pingora_proxy::RangeType::Invalid => {
1915                debug!(
1916                    correlation_id = %ctx.trace_id,
1917                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1918                    "Invalid range header"
1919                );
1920            }
1921        }
1922
1923        range_type
1924    }
1925
1926    /// Handle fatal proxy errors by generating custom error pages.
1927    /// Called when the proxy itself fails to process the request.
1928    async fn fail_to_proxy(
1929        &self,
1930        session: &mut Session,
1931        e: &Error,
1932        ctx: &mut Self::CTX,
1933    ) -> pingora_proxy::FailToProxy
1934    where
1935        Self::CTX: Send + Sync,
1936    {
1937        let error_code = match e.etype() {
1938            // Connection errors
1939            ErrorType::ConnectRefused => 503,
1940            ErrorType::ConnectTimedout => 504,
1941            ErrorType::ConnectNoRoute => 502,
1942
1943            // Timeout errors
1944            ErrorType::ReadTimedout => 504,
1945            ErrorType::WriteTimedout => 504,
1946
1947            // TLS errors
1948            ErrorType::TLSHandshakeFailure => 502,
1949            ErrorType::InvalidCert => 502,
1950
1951            // Protocol errors
1952            ErrorType::InvalidHTTPHeader => 400,
1953            ErrorType::H2Error => 502,
1954
1955            // Resource errors
1956            ErrorType::ConnectProxyFailure => 502,
1957            ErrorType::ConnectionClosed => 502,
1958
1959            // Explicit HTTP status (e.g., from agent fail-closed blocking)
1960            ErrorType::HTTPStatus(status) => *status,
1961
1962            // Internal errors - return 502 for upstream issues (more accurate than 500)
1963            ErrorType::InternalError => {
1964                // Check if this is an upstream-related error
1965                let error_str = e.to_string();
1966                if error_str.contains("upstream")
1967                    || error_str.contains("DNS")
1968                    || error_str.contains("resolve")
1969                {
1970                    502
1971                } else {
1972                    500
1973                }
1974            }
1975
1976            // Default to 502 for unknown errors
1977            _ => 502,
1978        };
1979
1980        error!(
1981            correlation_id = %ctx.trace_id,
1982            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1983            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1984            error_type = ?e.etype(),
1985            error = %e,
1986            error_code = error_code,
1987            "Proxy error occurred"
1988        );
1989
1990        // Record the error in metrics
1991        self.metrics
1992            .record_blocked_request(&format!("proxy_error_{}", error_code));
1993
1994        // Write error response to ensure client receives a proper HTTP response
1995        // This is necessary because some errors occur before the upstream connection
1996        // is established, and Pingora may not send a response automatically
1997        let error_message = match error_code {
1998            400 => "Bad Request",
1999            502 => "Bad Gateway",
2000            503 => "Service Unavailable",
2001            504 => "Gateway Timeout",
2002            _ => "Internal Server Error",
2003        };
2004
2005        // Build a minimal error response body
2006        let body = format!(
2007            r#"{{"error":"{} {}","trace_id":"{}"}}"#,
2008            error_code, error_message, ctx.trace_id
2009        );
2010
2011        // Write the response header
2012        let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
2013        header
2014            .insert_header("Content-Type", "application/json")
2015            .ok();
2016        header
2017            .insert_header("Content-Length", body.len().to_string())
2018            .ok();
2019        header
2020            .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
2021            .ok();
2022        header.insert_header("Connection", "close").ok();
2023
2024        // Write headers and body
2025        if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
2026            warn!(
2027                correlation_id = %ctx.trace_id,
2028                error = %write_err,
2029                "Failed to write error response header"
2030            );
2031        } else {
2032            // Write the body
2033            if let Err(write_err) = session
2034                .write_response_body(Some(bytes::Bytes::from(body)), true)
2035                .await
2036            {
2037                warn!(
2038                    correlation_id = %ctx.trace_id,
2039                    error = %write_err,
2040                    "Failed to write error response body"
2041                );
2042            }
2043        }
2044
2045        // Return the error response info
2046        // can_reuse_downstream: false since we already wrote and closed the response
2047        pingora_proxy::FailToProxy {
2048            error_code,
2049            can_reuse_downstream: false,
2050        }
2051    }
2052
2053    /// Handle errors that occur during proxying after upstream connection is established.
2054    ///
2055    /// This method enables retry logic and circuit breaker integration.
2056    /// It's called when an error occurs during the request/response exchange
2057    /// with the upstream server.
2058    fn error_while_proxy(
2059        &self,
2060        peer: &HttpPeer,
2061        session: &mut Session,
2062        e: Box<Error>,
2063        ctx: &mut Self::CTX,
2064        client_reused: bool,
2065    ) -> Box<Error> {
2066        let error_type = e.etype().clone();
2067        let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
2068
2069        // Classify error for retry decisions
2070        let is_retryable = matches!(
2071            error_type,
2072            ErrorType::ConnectTimedout
2073                | ErrorType::ReadTimedout
2074                | ErrorType::WriteTimedout
2075                | ErrorType::ConnectionClosed
2076                | ErrorType::ConnectRefused
2077        );
2078
2079        // Log the error with context
2080        warn!(
2081            correlation_id = %ctx.trace_id,
2082            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2083            upstream = %upstream_id,
2084            peer_address = %peer.address(),
2085            error_type = ?error_type,
2086            error = %e,
2087            client_reused = client_reused,
2088            is_retryable = is_retryable,
2089            "Error during proxy operation"
2090        );
2091
2092        // Record failure with circuit breaker via upstream pool
2093        // This is done asynchronously since we can't await in a sync fn
2094        let peer_address = peer.address().to_string();
2095        let upstream_pools = self.upstream_pools.clone();
2096        let upstream_id_owned = upstream_id.to_string();
2097        tokio::spawn(async move {
2098            if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
2099                pool.report_result(&peer_address, false).await;
2100            }
2101        });
2102
2103        // Metrics tracking
2104        self.metrics
2105            .record_blocked_request(&format!("proxy_error_{:?}", error_type));
2106
2107        // Create enhanced error with retry information
2108        let mut enhanced_error = e.more_context(format!(
2109            "Upstream: {}, Peer: {}, Attempts: {}",
2110            upstream_id,
2111            peer.address(),
2112            ctx.upstream_attempts
2113        ));
2114
2115        // Determine if retry should be attempted:
2116        // - Only retry if error is retryable type
2117        // - Only retry reused connections if buffer isn't truncated
2118        // - Track retry metrics
2119        if is_retryable {
2120            let can_retry = if client_reused {
2121                // For reused connections, check if retry buffer is intact
2122                !session.as_ref().retry_buffer_truncated()
2123            } else {
2124                // Fresh connections can always retry
2125                true
2126            };
2127
2128            enhanced_error.retry.decide_reuse(can_retry);
2129
2130            if can_retry {
2131                debug!(
2132                    correlation_id = %ctx.trace_id,
2133                    upstream = %upstream_id,
2134                    error_type = ?error_type,
2135                    "Error is retryable, will attempt retry"
2136                );
2137            }
2138        } else {
2139            // Non-retryable error - don't retry
2140            enhanced_error.retry.decide_reuse(false);
2141        }
2142
2143        enhanced_error
2144    }
2145
2146    async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
2147        // Decrement active requests
2148        self.reload_coordinator.dec_requests();
2149
2150        let duration = ctx.elapsed();
2151
2152        // Get response status
2153        let status = session
2154            .response_written()
2155            .map(|r| r.status.as_u16())
2156            .unwrap_or(0);
2157
2158        // Report result to load balancer for adaptive LB feedback
2159        // This enables latency-aware weight adjustment
2160        if let (Some(ref peer_addr), Some(ref upstream_id)) =
2161            (&ctx.selected_upstream_address, &ctx.upstream)
2162        {
2163            // Success = status code < 500 (client errors are not upstream failures)
2164            let success = status > 0 && status < 500;
2165
2166            if let Some(pool) = self.upstream_pools.get(upstream_id).await {
2167                pool.report_result_with_latency(peer_addr, success, Some(duration))
2168                    .await;
2169                trace!(
2170                    correlation_id = %ctx.trace_id,
2171                    upstream = %upstream_id,
2172                    peer_address = %peer_addr,
2173                    success = success,
2174                    duration_ms = duration.as_millis(),
2175                    status = status,
2176                    "Reported result to adaptive load balancer"
2177                );
2178            }
2179        }
2180
2181        // Write to access log file if configured
2182        if self.log_manager.access_log_enabled() {
2183            let access_entry = AccessLogEntry {
2184                timestamp: chrono::Utc::now().to_rfc3339(),
2185                trace_id: ctx.trace_id.clone(),
2186                method: ctx.method.clone(),
2187                path: ctx.path.clone(),
2188                query: ctx.query.clone(),
2189                protocol: "HTTP/1.1".to_string(),
2190                status,
2191                body_bytes: ctx.response_bytes,
2192                duration_ms: duration.as_millis() as u64,
2193                client_ip: ctx.client_ip.clone(),
2194                user_agent: ctx.user_agent.clone(),
2195                referer: ctx.referer.clone(),
2196                host: ctx.host.clone(),
2197                route_id: ctx.route_id.clone(),
2198                upstream: ctx.upstream.clone(),
2199                upstream_attempts: ctx.upstream_attempts,
2200                instance_id: self.app_state.instance_id.clone(),
2201                namespace: ctx.namespace.clone(),
2202                service: ctx.service.clone(),
2203            };
2204            self.log_manager.log_access(&access_entry);
2205        }
2206
2207        // Log to tracing at debug level (avoid allocations if debug disabled)
2208        if tracing::enabled!(tracing::Level::DEBUG) {
2209            debug!(
2210                trace_id = %ctx.trace_id,
2211                method = %ctx.method,
2212                path = %ctx.path,
2213                route_id = ?ctx.route_id,
2214                upstream = ?ctx.upstream,
2215                status = status,
2216                duration_ms = duration.as_millis() as u64,
2217                upstream_attempts = ctx.upstream_attempts,
2218                error = ?_error.map(|e| e.to_string()),
2219                "Request completed"
2220            );
2221        }
2222
2223        // Log WebSocket upgrades at info level
2224        if ctx.is_websocket_upgrade && status == 101 {
2225            info!(
2226                trace_id = %ctx.trace_id,
2227                route_id = ?ctx.route_id,
2228                upstream = ?ctx.upstream,
2229                client_ip = %ctx.client_ip,
2230                "WebSocket connection established"
2231            );
2232        }
2233
2234        // End OpenTelemetry span
2235        if let Some(span) = ctx.otel_span.take() {
2236            span.end();
2237        }
2238    }
2239}
2240
2241// =============================================================================
2242// Helper methods for body streaming (not part of ProxyHttp trait)
2243// =============================================================================
2244
2245impl SentinelProxy {
2246    /// Process a single body chunk in streaming mode.
2247    async fn process_body_chunk_streaming(
2248        &self,
2249        body: &mut Option<Bytes>,
2250        end_of_stream: bool,
2251        ctx: &mut RequestContext,
2252    ) -> Result<(), Box<Error>> {
2253        // Clone the chunk data to avoid borrowing issues when mutating body later
2254        let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
2255        let chunk_index = ctx.request_body_chunk_index;
2256        ctx.request_body_chunk_index += 1;
2257        ctx.body_bytes_inspected += chunk_data.len() as u64;
2258
2259        debug!(
2260            correlation_id = %ctx.trace_id,
2261            chunk_index = chunk_index,
2262            chunk_size = chunk_data.len(),
2263            end_of_stream = end_of_stream,
2264            "Streaming body chunk to agents"
2265        );
2266
2267        // Create agent call context
2268        let agent_ctx = crate::agents::AgentCallContext {
2269            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
2270            metadata: sentinel_agent_protocol::RequestMetadata {
2271                correlation_id: ctx.trace_id.clone(),
2272                request_id: ctx.trace_id.clone(),
2273                client_ip: ctx.client_ip.clone(),
2274                client_port: 0,
2275                server_name: ctx.host.clone(),
2276                protocol: "HTTP/1.1".to_string(),
2277                tls_version: None,
2278                tls_cipher: None,
2279                route_id: ctx.route_id.clone(),
2280                upstream_id: ctx.upstream.clone(),
2281                timestamp: chrono::Utc::now().to_rfc3339(),
2282                traceparent: ctx.traceparent(),
2283            },
2284            route_id: ctx.route_id.clone(),
2285            upstream_id: ctx.upstream.clone(),
2286            request_body: None, // Not used in streaming mode
2287            response_body: None,
2288        };
2289
2290        let agent_ids = ctx.body_inspection_agents.clone();
2291        let total_size = None; // Unknown in streaming mode
2292
2293        match self
2294            .agent_manager
2295            .process_request_body_streaming(
2296                &agent_ctx,
2297                &chunk_data,
2298                end_of_stream,
2299                chunk_index,
2300                ctx.body_bytes_inspected as usize,
2301                total_size,
2302                &agent_ids,
2303            )
2304            .await
2305        {
2306            Ok(decision) => {
2307                // Track if agent needs more data
2308                ctx.agent_needs_more = decision.needs_more;
2309
2310                // Apply body mutation if present
2311                if let Some(ref mutation) = decision.request_body_mutation {
2312                    if !mutation.is_pass_through() {
2313                        if mutation.is_drop() {
2314                            // Drop the chunk
2315                            *body = None;
2316                            trace!(
2317                                correlation_id = %ctx.trace_id,
2318                                chunk_index = chunk_index,
2319                                "Agent dropped body chunk"
2320                            );
2321                        } else if let Some(ref new_data) = mutation.data {
2322                            // Replace chunk with mutated content
2323                            *body = Some(Bytes::from(new_data.clone()));
2324                            trace!(
2325                                correlation_id = %ctx.trace_id,
2326                                chunk_index = chunk_index,
2327                                original_size = chunk_data.len(),
2328                                new_size = new_data.len(),
2329                                "Agent mutated body chunk"
2330                            );
2331                        }
2332                    }
2333                }
2334
2335                // Check decision (only final if needs_more is false)
2336                if !decision.needs_more && !decision.is_allow() {
2337                    warn!(
2338                        correlation_id = %ctx.trace_id,
2339                        action = ?decision.action,
2340                        "Agent blocked request body"
2341                    );
2342                    self.metrics.record_blocked_request("agent_body_inspection");
2343
2344                    let (status, message) = match &decision.action {
2345                        crate::agents::AgentAction::Block { status, body, .. } => (
2346                            *status,
2347                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
2348                        ),
2349                        _ => (403, "Forbidden".to_string()),
2350                    };
2351
2352                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2353                }
2354
2355                trace!(
2356                    correlation_id = %ctx.trace_id,
2357                    needs_more = decision.needs_more,
2358                    "Agent processed body chunk"
2359                );
2360            }
2361            Err(e) => {
2362                let fail_closed = ctx
2363                    .route_config
2364                    .as_ref()
2365                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2366                    .unwrap_or(false);
2367
2368                if fail_closed {
2369                    error!(
2370                        correlation_id = %ctx.trace_id,
2371                        error = %e,
2372                        "Agent streaming body inspection failed, blocking (fail-closed)"
2373                    );
2374                    return Err(Error::explain(
2375                        ErrorType::HTTPStatus(503),
2376                        "Service unavailable",
2377                    ));
2378                } else {
2379                    warn!(
2380                        correlation_id = %ctx.trace_id,
2381                        error = %e,
2382                        "Agent streaming body inspection failed, allowing (fail-open)"
2383                    );
2384                }
2385            }
2386        }
2387
2388        Ok(())
2389    }
2390
2391    /// Send buffered body to agents (buffer mode).
2392    async fn send_buffered_body_to_agents(
2393        &self,
2394        end_of_stream: bool,
2395        ctx: &mut RequestContext,
2396    ) -> Result<(), Box<Error>> {
2397        debug!(
2398            correlation_id = %ctx.trace_id,
2399            buffer_size = ctx.body_buffer.len(),
2400            end_of_stream = end_of_stream,
2401            agent_count = ctx.body_inspection_agents.len(),
2402            decompression_enabled = ctx.decompression_enabled,
2403            "Sending buffered body to agents for inspection"
2404        );
2405
2406        // Decompress body if enabled and we have a supported encoding
2407        let body_for_inspection = if ctx.decompression_enabled {
2408            if let Some(ref encoding) = ctx.body_content_encoding {
2409                let config = crate::decompression::DecompressionConfig {
2410                    max_ratio: ctx.max_decompression_ratio,
2411                    max_output_bytes: ctx.max_decompression_bytes,
2412                };
2413
2414                match crate::decompression::decompress_body(
2415                    &ctx.body_buffer,
2416                    encoding,
2417                    &config,
2418                ) {
2419                    Ok(result) => {
2420                        ctx.body_was_decompressed = true;
2421                        self.metrics
2422                            .record_decompression_success(encoding, result.ratio);
2423                        debug!(
2424                            correlation_id = %ctx.trace_id,
2425                            encoding = %encoding,
2426                            compressed_size = result.compressed_size,
2427                            decompressed_size = result.decompressed_size,
2428                            ratio = result.ratio,
2429                            "Body decompressed for agent inspection"
2430                        );
2431                        result.data
2432                    }
2433                    Err(e) => {
2434                        // Record failure metric
2435                        let failure_reason = match &e {
2436                            crate::decompression::DecompressionError::RatioExceeded { .. } => {
2437                                "ratio_exceeded"
2438                            }
2439                            crate::decompression::DecompressionError::SizeExceeded { .. } => {
2440                                "size_exceeded"
2441                            }
2442                            crate::decompression::DecompressionError::InvalidData { .. } => {
2443                                "invalid_data"
2444                            }
2445                            crate::decompression::DecompressionError::UnsupportedEncoding { .. } => {
2446                                "unsupported"
2447                            }
2448                            crate::decompression::DecompressionError::IoError(_) => "io_error",
2449                        };
2450                        self.metrics
2451                            .record_decompression_failure(encoding, failure_reason);
2452
2453                        // Decompression failed - decide based on failure mode
2454                        let fail_closed = ctx
2455                            .route_config
2456                            .as_ref()
2457                            .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2458                            .unwrap_or(false);
2459
2460                        if fail_closed {
2461                            error!(
2462                                correlation_id = %ctx.trace_id,
2463                                error = %e,
2464                                encoding = %encoding,
2465                                "Decompression failed, blocking (fail-closed)"
2466                            );
2467                            return Err(Error::explain(
2468                                ErrorType::HTTPStatus(400),
2469                                "Invalid compressed body",
2470                            ));
2471                        } else {
2472                            warn!(
2473                                correlation_id = %ctx.trace_id,
2474                                error = %e,
2475                                encoding = %encoding,
2476                                "Decompression failed, sending compressed body (fail-open)"
2477                            );
2478                            ctx.body_buffer.clone()
2479                        }
2480                    }
2481                }
2482            } else {
2483                ctx.body_buffer.clone()
2484            }
2485        } else {
2486            ctx.body_buffer.clone()
2487        };
2488
2489        let agent_ctx = crate::agents::AgentCallContext {
2490            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
2491            metadata: sentinel_agent_protocol::RequestMetadata {
2492                correlation_id: ctx.trace_id.clone(),
2493                request_id: ctx.trace_id.clone(),
2494                client_ip: ctx.client_ip.clone(),
2495                client_port: 0,
2496                server_name: ctx.host.clone(),
2497                protocol: "HTTP/1.1".to_string(),
2498                tls_version: None,
2499                tls_cipher: None,
2500                route_id: ctx.route_id.clone(),
2501                upstream_id: ctx.upstream.clone(),
2502                timestamp: chrono::Utc::now().to_rfc3339(),
2503                traceparent: ctx.traceparent(),
2504            },
2505            route_id: ctx.route_id.clone(),
2506            upstream_id: ctx.upstream.clone(),
2507            request_body: Some(body_for_inspection.clone()),
2508            response_body: None,
2509        };
2510
2511        let agent_ids = ctx.body_inspection_agents.clone();
2512        match self
2513            .agent_manager
2514            .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
2515            .await
2516        {
2517            Ok(decision) => {
2518                if !decision.is_allow() {
2519                    warn!(
2520                        correlation_id = %ctx.trace_id,
2521                        action = ?decision.action,
2522                        "Agent blocked request body"
2523                    );
2524                    self.metrics.record_blocked_request("agent_body_inspection");
2525
2526                    let (status, message) = match &decision.action {
2527                        crate::agents::AgentAction::Block { status, body, .. } => (
2528                            *status,
2529                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
2530                        ),
2531                        _ => (403, "Forbidden".to_string()),
2532                    };
2533
2534                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2535                }
2536
2537                trace!(
2538                    correlation_id = %ctx.trace_id,
2539                    "Agent allowed request body"
2540                );
2541            }
2542            Err(e) => {
2543                let fail_closed = ctx
2544                    .route_config
2545                    .as_ref()
2546                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2547                    .unwrap_or(false);
2548
2549                if fail_closed {
2550                    error!(
2551                        correlation_id = %ctx.trace_id,
2552                        error = %e,
2553                        "Agent body inspection failed, blocking (fail-closed)"
2554                    );
2555                    return Err(Error::explain(
2556                        ErrorType::HTTPStatus(503),
2557                        "Service unavailable",
2558                    ));
2559                } else {
2560                    warn!(
2561                        correlation_id = %ctx.trace_id,
2562                        error = %e,
2563                        "Agent body inspection failed, allowing (fail-open)"
2564                    );
2565                }
2566            }
2567        }
2568
2569        Ok(())
2570    }
2571}