Skip to main content

sentinel_proxy/proxy/
http_trait.rs

1//! ProxyHttp trait implementation for SentinelProxy.
2//!
3//! This module contains the Pingora ProxyHttp trait implementation which defines
4//! the core request/response lifecycle handling.
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14    CacheKey, CacheMeta, ForcedFreshness, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::inference::{
23    extract_inference_content, is_sse_response, PromptInjectionResult, StreamingTokenCounter,
24};
25use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
26use crate::rate_limit::HeaderAccessor;
27use crate::routing::RequestInfo;
28
29use super::context::{FallbackReason, RequestContext};
30use super::fallback::FallbackEvaluator;
31use super::fallback_metrics::get_fallback_metrics;
32use super::model_routing;
33use super::model_routing_metrics::get_model_routing_metrics;
34use super::SentinelProxy;
35
36/// Helper type for rate limiting when we don't need header access
37struct NoHeaderAccessor;
38impl HeaderAccessor for NoHeaderAccessor {
39    fn get_header(&self, _name: &str) -> Option<String> {
40        None
41    }
42}
43
44#[async_trait]
45impl ProxyHttp for SentinelProxy {
46    type CTX = RequestContext;
47
48    fn new_ctx(&self) -> Self::CTX {
49        RequestContext::new()
50    }
51
52    fn fail_to_connect(
53        &self,
54        _session: &mut Session,
55        peer: &HttpPeer,
56        ctx: &mut Self::CTX,
57        e: Box<Error>,
58    ) -> Box<Error> {
59        error!(
60            correlation_id = %ctx.trace_id,
61            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
62            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
63            peer_address = %peer.address(),
64            error = %e,
65            "Failed to connect to upstream peer"
66        );
67        // Custom error pages are handled in response_filter
68        e
69    }
70
71    /// Early request filter - runs before upstream selection
72    /// Used to handle builtin routes that don't need an upstream connection
73    async fn early_request_filter(
74        &self,
75        session: &mut Session,
76        ctx: &mut Self::CTX,
77    ) -> Result<(), Box<Error>> {
78        // Extract request info for routing
79        let req_header = session.req_header();
80        let method = req_header.method.as_str();
81        let path = req_header.uri.path();
82        let host = req_header
83            .headers
84            .get("host")
85            .and_then(|h| h.to_str().ok())
86            .unwrap_or("");
87
88        // Handle ACME HTTP-01 challenges before any other processing
89        if let Some(ref challenge_manager) = self.acme_challenges {
90            if let Some(token) = crate::acme::ChallengeManager::extract_token(path) {
91                if let Some(key_authorization) = challenge_manager.get_response(token) {
92                    debug!(
93                        token = %token,
94                        "Serving ACME HTTP-01 challenge response"
95                    );
96
97                    // Build response
98                    let mut resp = ResponseHeader::build(200, None)?;
99                    resp.insert_header("Content-Type", "text/plain")?;
100                    resp.insert_header("Content-Length", key_authorization.len().to_string())?;
101
102                    // Send response
103                    session
104                        .write_response_header(Box::new(resp), false)
105                        .await?;
106                    session
107                        .write_response_body(Some(Bytes::from(key_authorization)), true)
108                        .await?;
109
110                    // Return error to signal request is complete
111                    return Err(Error::explain(
112                        ErrorType::InternalError,
113                        "ACME challenge served",
114                    ));
115                } else {
116                    // Token not found - could be a stale request or attack
117                    warn!(
118                        token = %token,
119                        "ACME challenge token not found"
120                    );
121                }
122            }
123        }
124
125        ctx.method = method.to_string();
126        ctx.path = path.to_string();
127        ctx.host = Some(host.to_string());
128
129        // Match route to determine service type
130        let route_match = {
131            let route_matcher = self.route_matcher.read();
132            let request_info = RequestInfo::new(method, path, host);
133            match route_matcher.match_request(&request_info) {
134                Some(m) => m,
135                None => return Ok(()), // No matching route, let upstream_peer handle it
136            }
137        };
138
139        ctx.trace_id = self.get_trace_id(session);
140        ctx.route_id = Some(route_match.route_id.to_string());
141        ctx.route_config = Some(route_match.config.clone());
142
143        // Parse incoming W3C trace context if present
144        if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
145            if let Ok(s) = traceparent.to_str() {
146                ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
147            }
148        }
149
150        // Start OpenTelemetry request span if tracing is enabled
151        if let Some(tracer) = crate::otel::get_tracer() {
152            ctx.otel_span = Some(tracer.start_span(method, path, ctx.trace_context.as_ref()));
153        }
154
155        // Check if this is a builtin handler route
156        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
157            trace!(
158                correlation_id = %ctx.trace_id,
159                route_id = %route_match.route_id,
160                builtin_handler = ?route_match.config.builtin_handler,
161                "Handling builtin route in early_request_filter"
162            );
163
164            // Handle the builtin route directly
165            let handled = self
166                .handle_builtin_route(session, ctx, &route_match)
167                .await?;
168
169            if handled {
170                // Return error to signal that request is complete (Pingora will not continue)
171                return Err(Error::explain(
172                    ErrorType::InternalError,
173                    "Builtin handler complete",
174                ));
175            }
176        }
177
178        Ok(())
179    }
180
181    async fn upstream_peer(
182        &self,
183        session: &mut Session,
184        ctx: &mut Self::CTX,
185    ) -> Result<Box<HttpPeer>, Box<Error>> {
186        // Track active request
187        self.reload_coordinator.inc_requests();
188
189        // Cache global config once per request (avoids repeated Arc clones)
190        if ctx.config.is_none() {
191            ctx.config = Some(self.config_manager.current());
192        }
193
194        // Cache client address for logging if not already set
195        if ctx.client_ip.is_empty() {
196            ctx.client_ip = session
197                .client_addr()
198                .map(|a| a.to_string())
199                .unwrap_or_else(|| "unknown".to_string());
200        }
201
202        let req_header = session.req_header();
203
204        // Cache request info for access logging if not already set
205        if ctx.method.is_empty() {
206            ctx.method = req_header.method.to_string();
207            ctx.path = req_header.uri.path().to_string();
208            ctx.query = req_header.uri.query().map(|q| q.to_string());
209            ctx.host = req_header
210                .headers
211                .get("host")
212                .and_then(|v| v.to_str().ok())
213                .map(|s| s.to_string());
214        }
215        ctx.user_agent = req_header
216            .headers
217            .get("user-agent")
218            .and_then(|v| v.to_str().ok())
219            .map(|s| s.to_string());
220        ctx.referer = req_header
221            .headers
222            .get("referer")
223            .and_then(|v| v.to_str().ok())
224            .map(|s| s.to_string());
225
226        trace!(
227            correlation_id = %ctx.trace_id,
228            client_ip = %ctx.client_ip,
229            "Request received, initializing context"
230        );
231
232        // Use cached route info if already set by early_request_filter
233        let route_match = if let Some(ref route_config) = ctx.route_config {
234            let route_id = ctx.route_id.as_deref().unwrap_or("");
235            crate::routing::RouteMatch {
236                route_id: sentinel_common::RouteId::new(route_id),
237                config: route_config.clone(),
238            }
239        } else {
240            // Match route using sync RwLock (scoped to ensure lock is released before async ops)
241            let (match_result, route_duration) = {
242                let route_matcher = self.route_matcher.read();
243                let host = ctx.host.as_deref().unwrap_or("");
244
245                // Build request info (zero-copy for common case)
246                let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
247
248                // Only build headers HashMap if any route needs header matching
249                if route_matcher.needs_headers() {
250                    request_info = request_info
251                        .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
252                }
253
254                // Only parse query params if any route needs query param matching
255                if route_matcher.needs_query_params() {
256                    request_info =
257                        request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
258                }
259
260                trace!(
261                    correlation_id = %ctx.trace_id,
262                    method = %request_info.method,
263                    path = %request_info.path,
264                    host = %request_info.host,
265                    "Built request info for route matching"
266                );
267
268                let route_start = std::time::Instant::now();
269                let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
270                    warn!(
271                        correlation_id = %ctx.trace_id,
272                        method = %request_info.method,
273                        path = %request_info.path,
274                        host = %request_info.host,
275                        "No matching route found for request"
276                    );
277                    Error::explain(ErrorType::InternalError, "No matching route found")
278                })?;
279                let route_duration = route_start.elapsed();
280                // Lock is dropped here when block ends
281                (route_match, route_duration)
282            };
283
284            ctx.route_id = Some(match_result.route_id.to_string());
285            ctx.route_config = Some(match_result.config.clone());
286
287            // Set trace_id if not already set by early_request_filter
288            if ctx.trace_id.is_empty() {
289                ctx.trace_id = self.get_trace_id(session);
290
291                // Parse incoming W3C trace context if present
292                if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER)
293                {
294                    if let Ok(s) = traceparent.to_str() {
295                        ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
296                    }
297                }
298
299                // Start OpenTelemetry request span if tracing is enabled
300                if let Some(tracer) = crate::otel::get_tracer() {
301                    ctx.otel_span = Some(tracer.start_span(
302                        &ctx.method,
303                        &ctx.path,
304                        ctx.trace_context.as_ref(),
305                    ));
306                }
307            }
308
309            trace!(
310                correlation_id = %ctx.trace_id,
311                route_id = %match_result.route_id,
312                route_duration_us = route_duration.as_micros(),
313                service_type = ?match_result.config.service_type,
314                "Route matched"
315            );
316            match_result
317        };
318
319        // Check if this is a builtin handler route (no upstream needed)
320        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
321            trace!(
322                correlation_id = %ctx.trace_id,
323                route_id = %route_match.route_id,
324                builtin_handler = ?route_match.config.builtin_handler,
325                "Route type is builtin, skipping upstream"
326            );
327            // Mark as builtin route for later processing in request_filter
328            ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
329            // Return error to skip upstream connection for builtin routes
330            return Err(Error::explain(
331                ErrorType::InternalError,
332                "Builtin handler handled in request_filter",
333            ));
334        }
335
336        // Check if this is a static file route
337        if route_match.config.service_type == sentinel_config::ServiceType::Static {
338            trace!(
339                correlation_id = %ctx.trace_id,
340                route_id = %route_match.route_id,
341                "Route type is static, checking for static server"
342            );
343            // Static routes don't need an upstream
344            if self
345                .static_servers
346                .get(route_match.route_id.as_str())
347                .await
348                .is_some()
349            {
350                // Mark this as a static route for later processing
351                ctx.upstream = Some(format!("_static_{}", route_match.route_id));
352                info!(
353                    correlation_id = %ctx.trace_id,
354                    route_id = %route_match.route_id,
355                    path = %ctx.path,
356                    "Serving static file"
357                );
358                // Return error to avoid upstream connection for static routes
359                return Err(Error::explain(
360                    ErrorType::InternalError,
361                    "Static file serving handled in request_filter",
362                ));
363            }
364        }
365
366        // === Model-based routing (for inference routes) ===
367        // Check if model routing is configured and select upstream based on model
368        let mut model_routing_applied = false;
369        if let Some(ref inference) = route_match.config.inference {
370            if let Some(ref model_routing) = inference.model_routing {
371                // Try to extract model from headers (fast path - no body parsing needed)
372                let model = model_routing::extract_model_from_headers(&req_header.headers);
373
374                if let Some(ref model_name) = model {
375                    // Find upstream for this model
376                    if let Some(routing_result) =
377                        model_routing::find_upstream_for_model(model_routing, model_name)
378                    {
379                        debug!(
380                            correlation_id = %ctx.trace_id,
381                            route_id = %route_match.route_id,
382                            model = %model_name,
383                            upstream = %routing_result.upstream,
384                            is_default = routing_result.is_default,
385                            provider_override = ?routing_result.provider,
386                            "Model-based routing selected upstream"
387                        );
388
389                        ctx.record_model_routing(
390                            &routing_result.upstream,
391                            Some(model_name.clone()),
392                            routing_result.provider,
393                        );
394                        model_routing_applied = true;
395
396                        // Record metrics
397                        if let Some(metrics) = get_model_routing_metrics() {
398                            metrics.record_model_routed(
399                                route_match.route_id.as_str(),
400                                model_name,
401                                &routing_result.upstream,
402                            );
403                            if routing_result.is_default {
404                                metrics.record_default_upstream(route_match.route_id.as_str());
405                            }
406                            if let Some(provider) = routing_result.provider {
407                                metrics.record_provider_override(
408                                    route_match.route_id.as_str(),
409                                    &routing_result.upstream,
410                                    provider.as_str(),
411                                );
412                            }
413                        }
414                    }
415                } else if let Some(ref default_upstream) = model_routing.default_upstream {
416                    // No model in headers, use default upstream
417                    debug!(
418                        correlation_id = %ctx.trace_id,
419                        route_id = %route_match.route_id,
420                        upstream = %default_upstream,
421                        "Model-based routing using default upstream (no model header)"
422                    );
423                    ctx.record_model_routing(default_upstream, None, None);
424                    model_routing_applied = true;
425
426                    // Record metrics for no model header case
427                    if let Some(metrics) = get_model_routing_metrics() {
428                        metrics.record_no_model_header(route_match.route_id.as_str());
429                    }
430                }
431            }
432        }
433
434        // Regular route with upstream (if model routing didn't set it)
435        if !model_routing_applied {
436            if let Some(ref upstream) = route_match.config.upstream {
437                ctx.upstream = Some(upstream.clone());
438                trace!(
439                    correlation_id = %ctx.trace_id,
440                    route_id = %route_match.route_id,
441                    upstream = %upstream,
442                    "Upstream configured for route"
443                );
444            } else {
445                error!(
446                    correlation_id = %ctx.trace_id,
447                    route_id = %route_match.route_id,
448                    "Route has no upstream configured"
449                );
450                return Err(Error::explain(
451                    ErrorType::InternalError,
452                    format!(
453                        "Route '{}' has no upstream configured",
454                        route_match.route_id
455                    ),
456                ));
457            }
458        }
459
460        // === Fallback routing evaluation (pre-request) ===
461        // Check if fallback should be triggered due to health or budget conditions
462        if let Some(ref fallback_config) = route_match.config.fallback {
463            let upstream_name = ctx.upstream.as_ref().unwrap();
464
465            // Check if primary upstream is healthy
466            let is_healthy = if let Some(pool) = self.upstream_pools.get(upstream_name).await {
467                pool.has_healthy_targets().await
468            } else {
469                false // Pool not found, treat as unhealthy
470            };
471
472            // Check if budget is exhausted (for inference routes)
473            let is_budget_exhausted = ctx.inference_budget_exhausted;
474
475            // Get model name for model mapping (inference routes)
476            let current_model = ctx.inference_model.as_deref();
477
478            // Create fallback evaluator
479            let evaluator = FallbackEvaluator::new(
480                fallback_config,
481                ctx.tried_upstreams(),
482                ctx.fallback_attempt,
483            );
484
485            // Evaluate pre-request fallback conditions
486            if let Some(decision) = evaluator.should_fallback_before_request(
487                upstream_name,
488                is_healthy,
489                is_budget_exhausted,
490                current_model,
491            ) {
492                info!(
493                    correlation_id = %ctx.trace_id,
494                    route_id = %route_match.route_id,
495                    from_upstream = %upstream_name,
496                    to_upstream = %decision.next_upstream,
497                    reason = %decision.reason,
498                    fallback_attempt = ctx.fallback_attempt + 1,
499                    "Triggering fallback routing"
500                );
501
502                // Record fallback metrics
503                if let Some(metrics) = get_fallback_metrics() {
504                    metrics.record_fallback_attempt(
505                        route_match.route_id.as_str(),
506                        upstream_name,
507                        &decision.next_upstream,
508                        &decision.reason,
509                    );
510                }
511
512                // Record fallback in context
513                ctx.record_fallback(decision.reason, &decision.next_upstream);
514
515                // Apply model mapping if present
516                if let Some((original, mapped)) = decision.model_mapping {
517                    // Record model mapping metrics
518                    if let Some(metrics) = get_fallback_metrics() {
519                        metrics.record_model_mapping(
520                            route_match.route_id.as_str(),
521                            &original,
522                            &mapped,
523                        );
524                    }
525
526                    ctx.record_model_mapping(original, mapped);
527                    trace!(
528                        correlation_id = %ctx.trace_id,
529                        original_model = ?ctx.model_mapping_applied().map(|m| &m.0),
530                        mapped_model = ?ctx.model_mapping_applied().map(|m| &m.1),
531                        "Applied model mapping for fallback"
532                    );
533                }
534            }
535        }
536
537        debug!(
538            correlation_id = %ctx.trace_id,
539            route_id = %route_match.route_id,
540            upstream = ?ctx.upstream,
541            method = %req_header.method,
542            path = %req_header.uri.path(),
543            host = ctx.host.as_deref().unwrap_or("-"),
544            client_ip = %ctx.client_ip,
545            "Processing request"
546        );
547
548        // Get upstream pool (skip for static routes)
549        if ctx
550            .upstream
551            .as_ref()
552            .is_some_and(|u| u.starts_with("_static_"))
553        {
554            // Static routes are handled in request_filter, should not reach here
555            return Err(Error::explain(
556                ErrorType::InternalError,
557                "Static route should be handled in request_filter",
558            ));
559        }
560
561        let upstream_name = ctx
562            .upstream
563            .as_ref()
564            .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
565
566        trace!(
567            correlation_id = %ctx.trace_id,
568            upstream = %upstream_name,
569            "Looking up upstream pool"
570        );
571
572        let pool = self
573            .upstream_pools
574            .get(upstream_name)
575            .await
576            .ok_or_else(|| {
577                error!(
578                    correlation_id = %ctx.trace_id,
579                    upstream = %upstream_name,
580                    "Upstream pool not found"
581                );
582                Error::explain(
583                    ErrorType::InternalError,
584                    format!("Upstream pool '{}' not found", upstream_name),
585                )
586            })?;
587
588        // Select peer from pool with retries
589        let max_retries = route_match
590            .config
591            .retry_policy
592            .as_ref()
593            .map(|r| r.max_attempts)
594            .unwrap_or(1);
595
596        trace!(
597            correlation_id = %ctx.trace_id,
598            upstream = %upstream_name,
599            max_retries = max_retries,
600            "Starting upstream peer selection"
601        );
602
603        let mut last_error = None;
604        let selection_start = std::time::Instant::now();
605
606        for attempt in 1..=max_retries {
607            ctx.upstream_attempts = attempt;
608
609            trace!(
610                correlation_id = %ctx.trace_id,
611                upstream = %upstream_name,
612                attempt = attempt,
613                max_retries = max_retries,
614                "Attempting to select upstream peer"
615            );
616
617            match pool.select_peer_with_metadata(None).await {
618                Ok((peer, metadata)) => {
619                    let selection_duration = selection_start.elapsed();
620                    // Store selected peer address for feedback reporting in logging()
621                    let peer_addr = peer.address().to_string();
622                    ctx.selected_upstream_address = Some(peer_addr.clone());
623
624                    // Copy sticky session metadata to context for response_filter
625                    if metadata.contains_key("sticky_session_new") {
626                        ctx.sticky_session_new_assignment = true;
627                        ctx.sticky_session_set_cookie =
628                            metadata.get("sticky_set_cookie_header").cloned();
629                        ctx.sticky_target_index = metadata
630                            .get("sticky_target_index")
631                            .and_then(|s| s.parse().ok());
632
633                        trace!(
634                            correlation_id = %ctx.trace_id,
635                            sticky_target_index = ?ctx.sticky_target_index,
636                            "New sticky session assignment, will set cookie"
637                        );
638                    }
639
640                    debug!(
641                        correlation_id = %ctx.trace_id,
642                        upstream = %upstream_name,
643                        peer_address = %peer_addr,
644                        attempt = attempt,
645                        selection_duration_us = selection_duration.as_micros(),
646                        sticky_session_hit = metadata.contains_key("sticky_session_hit"),
647                        sticky_session_new = ctx.sticky_session_new_assignment,
648                        "Selected upstream peer"
649                    );
650                    return Ok(Box::new(peer));
651                }
652                Err(e) => {
653                    warn!(
654                        correlation_id = %ctx.trace_id,
655                        upstream = %upstream_name,
656                        attempt = attempt,
657                        max_retries = max_retries,
658                        error = %e,
659                        "Failed to select upstream peer"
660                    );
661                    last_error = Some(e);
662
663                    if attempt < max_retries {
664                        // Exponential backoff (using pingora-timeout for efficiency)
665                        let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
666                        trace!(
667                            correlation_id = %ctx.trace_id,
668                            backoff_ms = backoff.as_millis(),
669                            "Backing off before retry"
670                        );
671                        sleep(backoff).await;
672                    }
673                }
674            }
675        }
676
677        let selection_duration = selection_start.elapsed();
678        error!(
679            correlation_id = %ctx.trace_id,
680            upstream = %upstream_name,
681            attempts = max_retries,
682            selection_duration_ms = selection_duration.as_millis(),
683            last_error = ?last_error,
684            "All upstream selection attempts failed"
685        );
686
687        // Record exhausted metric if fallback was used but all upstreams failed
688        if ctx.used_fallback() {
689            if let Some(metrics) = get_fallback_metrics() {
690                metrics.record_fallback_exhausted(ctx.route_id.as_deref().unwrap_or("unknown"));
691            }
692        }
693
694        Err(Error::explain(
695            ErrorType::InternalError,
696            format!("All upstream attempts failed: {:?}", last_error),
697        ))
698    }
699
700    async fn request_filter(
701        &self,
702        session: &mut Session,
703        ctx: &mut Self::CTX,
704    ) -> Result<bool, Box<Error>> {
705        trace!(
706            correlation_id = %ctx.trace_id,
707            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
708            "Starting request filter phase"
709        );
710
711        // Check rate limiting early (before other processing)
712        // Fast path: skip if no rate limiting is configured for this route
713        if let Some(route_id) = ctx.route_id.as_deref() {
714            if self.rate_limit_manager.has_route_limiter(route_id) {
715                let rate_result = self.rate_limit_manager.check(
716                    route_id,
717                    &ctx.client_ip,
718                    &ctx.path,
719                    Option::<&NoHeaderAccessor>::None,
720                );
721
722                // Store rate limit info for response headers (even if allowed)
723                if rate_result.limit > 0 {
724                    ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
725                        limit: rate_result.limit,
726                        remaining: rate_result.remaining,
727                        reset_at: rate_result.reset_at,
728                    });
729                }
730
731                if !rate_result.allowed {
732                    use sentinel_config::RateLimitAction;
733
734                    match rate_result.action {
735                        RateLimitAction::Reject => {
736                            warn!(
737                                correlation_id = %ctx.trace_id,
738                                route_id = route_id,
739                                client_ip = %ctx.client_ip,
740                                limiter = %rate_result.limiter,
741                                limit = rate_result.limit,
742                                remaining = rate_result.remaining,
743                                "Request rate limited"
744                            );
745                            self.metrics.record_blocked_request("rate_limited");
746
747                            // Audit log the rate limit
748                            let audit_entry = AuditLogEntry::rate_limited(
749                                &ctx.trace_id,
750                                &ctx.method,
751                                &ctx.path,
752                                &ctx.client_ip,
753                                &rate_result.limiter,
754                            )
755                            .with_route_id(route_id)
756                            .with_status_code(rate_result.status_code);
757                            self.log_manager.log_audit(&audit_entry);
758
759                            // Send rate limit response with headers
760                            let body = rate_result
761                                .message
762                                .unwrap_or_else(|| "Rate limit exceeded".to_string());
763
764                            // Build response with rate limit headers
765                            let retry_after = rate_result.reset_at.saturating_sub(
766                                std::time::SystemTime::now()
767                                    .duration_since(std::time::UNIX_EPOCH)
768                                    .unwrap_or_default()
769                                    .as_secs(),
770                            );
771                            crate::http_helpers::write_rate_limit_error(
772                                session,
773                                rate_result.status_code,
774                                &body,
775                                rate_result.limit,
776                                rate_result.remaining,
777                                rate_result.reset_at,
778                                retry_after,
779                            )
780                            .await?;
781                            return Ok(true); // Request complete, don't continue
782                        }
783                        RateLimitAction::LogOnly => {
784                            debug!(
785                                correlation_id = %ctx.trace_id,
786                                route_id = route_id,
787                                "Rate limit exceeded (log only mode)"
788                            );
789                            // Continue processing
790                        }
791                        RateLimitAction::Delay => {
792                            // Apply delay if suggested by rate limiter
793                            if let Some(delay_ms) = rate_result.suggested_delay_ms {
794                                // Cap delay at the configured maximum
795                                let actual_delay = delay_ms.min(rate_result.max_delay_ms);
796
797                                if actual_delay > 0 {
798                                    debug!(
799                                        correlation_id = %ctx.trace_id,
800                                        route_id = route_id,
801                                        suggested_delay_ms = delay_ms,
802                                        max_delay_ms = rate_result.max_delay_ms,
803                                        actual_delay_ms = actual_delay,
804                                        "Applying rate limit delay"
805                                    );
806
807                                    tokio::time::sleep(std::time::Duration::from_millis(
808                                        actual_delay,
809                                    ))
810                                    .await;
811                                }
812                            }
813                            // Continue processing after delay
814                        }
815                    }
816                }
817            }
818        }
819
820        // Inference rate limiting (token-based, for LLM/AI routes)
821        // This runs after regular rate limiting and checks service type
822        if let Some(route_id) = ctx.route_id.as_deref() {
823            if let Some(ref route_config) = ctx.route_config {
824                if route_config.service_type == sentinel_config::ServiceType::Inference
825                    && self.inference_rate_limit_manager.has_route(route_id)
826                {
827                    // For inference rate limiting, we need access to the request body
828                    // to estimate tokens. We'll use buffered body if available.
829                    let headers = &session.req_header().headers;
830
831                    // Try to get buffered body, or use empty (will estimate from headers only)
832                    let body = ctx.body_buffer.as_slice();
833
834                    // Use client IP as the rate limit key (could be enhanced to use API key header)
835                    let rate_limit_key = &ctx.client_ip;
836
837                    if let Some(check_result) = self.inference_rate_limit_manager.check(
838                        route_id,
839                        rate_limit_key,
840                        headers,
841                        body,
842                    ) {
843                        // Store inference rate limiting context for recording actual tokens later
844                        ctx.inference_rate_limit_enabled = true;
845                        ctx.inference_estimated_tokens = check_result.estimated_tokens;
846                        ctx.inference_rate_limit_key = Some(rate_limit_key.to_string());
847                        ctx.inference_model = check_result.model.clone();
848
849                        if !check_result.is_allowed() {
850                            let retry_after_ms = check_result.retry_after_ms();
851                            let retry_after_secs = retry_after_ms.div_ceil(1000);
852
853                            warn!(
854                                correlation_id = %ctx.trace_id,
855                                route_id = route_id,
856                                client_ip = %ctx.client_ip,
857                                estimated_tokens = check_result.estimated_tokens,
858                                model = ?check_result.model,
859                                retry_after_ms = retry_after_ms,
860                                "Inference rate limit exceeded (tokens)"
861                            );
862                            self.metrics.record_blocked_request("inference_rate_limited");
863
864                            // Audit log the token rate limit
865                            let audit_entry = AuditLogEntry::new(
866                                &ctx.trace_id,
867                                AuditEventType::RateLimitExceeded,
868                                &ctx.method,
869                                &ctx.path,
870                                &ctx.client_ip,
871                            )
872                            .with_route_id(route_id)
873                            .with_status_code(429)
874                            .with_reason(format!(
875                                "Token rate limit exceeded: estimated {} tokens, model={:?}",
876                                check_result.estimated_tokens, check_result.model
877                            ));
878                            self.log_manager.log_audit(&audit_entry);
879
880                            // Send 429 response with appropriate headers
881                            let body = "Token rate limit exceeded";
882                            let reset_at = std::time::SystemTime::now()
883                                .duration_since(std::time::UNIX_EPOCH)
884                                .unwrap_or_default()
885                                .as_secs()
886                                + retry_after_secs;
887
888                            // Use simplified error write for inference rate limit
889                            crate::http_helpers::write_rate_limit_error(
890                                session,
891                                429,
892                                body,
893                                0, // No request limit
894                                0, // No remaining
895                                reset_at,
896                                retry_after_secs,
897                            )
898                            .await?;
899                            return Ok(true); // Request complete, don't continue
900                        }
901
902                        trace!(
903                            correlation_id = %ctx.trace_id,
904                            route_id = route_id,
905                            estimated_tokens = check_result.estimated_tokens,
906                            model = ?check_result.model,
907                            "Inference rate limit check passed"
908                        );
909
910                        // Check budget tracking (cumulative per-period limits)
911                        if self.inference_rate_limit_manager.has_budget(route_id) {
912                            ctx.inference_budget_enabled = true;
913
914                            if let Some(budget_result) = self.inference_rate_limit_manager.check_budget(
915                                route_id,
916                                rate_limit_key,
917                                check_result.estimated_tokens,
918                            ) {
919                                if !budget_result.is_allowed() {
920                                    let retry_after_secs = budget_result.retry_after_secs();
921
922                                    warn!(
923                                        correlation_id = %ctx.trace_id,
924                                        route_id = route_id,
925                                        client_ip = %ctx.client_ip,
926                                        estimated_tokens = check_result.estimated_tokens,
927                                        retry_after_secs = retry_after_secs,
928                                        "Token budget exhausted"
929                                    );
930
931                                    ctx.inference_budget_exhausted = true;
932                                    self.metrics.record_blocked_request("budget_exhausted");
933
934                                    // Audit log the budget exhaustion
935                                    let audit_entry = AuditLogEntry::new(
936                                        &ctx.trace_id,
937                                        AuditEventType::RateLimitExceeded,
938                                        &ctx.method,
939                                        &ctx.path,
940                                        &ctx.client_ip,
941                                    )
942                                    .with_route_id(route_id)
943                                    .with_status_code(429)
944                                    .with_reason("Token budget exhausted".to_string());
945                                    self.log_manager.log_audit(&audit_entry);
946
947                                    // Send 429 response with budget headers
948                                    let body = "Token budget exhausted";
949                                    let reset_at = std::time::SystemTime::now()
950                                        .duration_since(std::time::UNIX_EPOCH)
951                                        .unwrap_or_default()
952                                        .as_secs()
953                                        + retry_after_secs;
954
955                                    crate::http_helpers::write_rate_limit_error(
956                                        session,
957                                        429,
958                                        body,
959                                        0,
960                                        0,
961                                        reset_at,
962                                        retry_after_secs,
963                                    )
964                                    .await?;
965                                    return Ok(true);
966                                }
967
968                                // Capture budget status for response headers
969                                let remaining = match &budget_result {
970                                    sentinel_common::budget::BudgetCheckResult::Allowed { remaining } => *remaining as i64,
971                                    sentinel_common::budget::BudgetCheckResult::Soft { remaining, .. } => *remaining,
972                                    _ => 0,
973                                };
974                                ctx.inference_budget_remaining = Some(remaining);
975
976                                // Get period reset time from budget status
977                                if let Some(status) = self.inference_rate_limit_manager.budget_status(
978                                    route_id,
979                                    rate_limit_key,
980                                ) {
981                                    ctx.inference_budget_period_reset = Some(status.period_end);
982                                }
983
984                                trace!(
985                                    correlation_id = %ctx.trace_id,
986                                    route_id = route_id,
987                                    budget_remaining = remaining,
988                                    "Token budget check passed"
989                                );
990                            }
991                        }
992
993                        // Check if cost attribution is enabled
994                        if self.inference_rate_limit_manager.has_cost_attribution(route_id) {
995                            ctx.inference_cost_enabled = true;
996                        }
997                    }
998                }
999            }
1000        }
1001
1002        // Prompt injection guardrail (for inference routes)
1003        if let Some(ref route_config) = ctx.route_config {
1004            if let Some(ref inference) = route_config.inference {
1005                if let Some(ref guardrails) = inference.guardrails {
1006                    if let Some(ref pi_config) = guardrails.prompt_injection {
1007                        if pi_config.enabled && !ctx.body_buffer.is_empty() {
1008                            ctx.guardrails_enabled = true;
1009
1010                            // Extract content from request body
1011                            if let Some(content) = extract_inference_content(&ctx.body_buffer) {
1012                                let result = self
1013                                    .guardrail_processor
1014                                    .check_prompt_injection(
1015                                        pi_config,
1016                                        &content,
1017                                        ctx.inference_model.as_deref(),
1018                                        ctx.route_id.as_deref(),
1019                                        &ctx.trace_id,
1020                                    )
1021                                    .await;
1022
1023                                match result {
1024                                    PromptInjectionResult::Blocked {
1025                                        status,
1026                                        message,
1027                                        detections,
1028                                    } => {
1029                                        warn!(
1030                                            correlation_id = %ctx.trace_id,
1031                                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1032                                            detection_count = detections.len(),
1033                                            "Prompt injection detected, blocking request"
1034                                        );
1035
1036                                        self.metrics.record_blocked_request("prompt_injection");
1037
1038                                        // Store detection categories for logging
1039                                        ctx.guardrail_detection_categories = detections
1040                                            .iter()
1041                                            .map(|d| d.category.clone())
1042                                            .collect();
1043
1044                                        // Audit log the block
1045                                        let audit_entry = AuditLogEntry::new(
1046                                            &ctx.trace_id,
1047                                            AuditEventType::Blocked,
1048                                            &ctx.method,
1049                                            &ctx.path,
1050                                            &ctx.client_ip,
1051                                        )
1052                                        .with_route_id(
1053                                            ctx.route_id.as_deref().unwrap_or("unknown"),
1054                                        )
1055                                        .with_status_code(status)
1056                                        .with_reason("Prompt injection detected".to_string());
1057                                        self.log_manager.log_audit(&audit_entry);
1058
1059                                        // Send error response
1060                                        crate::http_helpers::write_json_error(
1061                                            session,
1062                                            status,
1063                                            "prompt_injection_blocked",
1064                                            Some(&message),
1065                                        )
1066                                        .await?;
1067                                        return Ok(true);
1068                                    }
1069                                    PromptInjectionResult::Detected { detections } => {
1070                                        // Log but allow
1071                                        warn!(
1072                                            correlation_id = %ctx.trace_id,
1073                                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1074                                            detection_count = detections.len(),
1075                                            "Prompt injection detected (logged only)"
1076                                        );
1077                                        ctx.guardrail_detection_categories = detections
1078                                            .iter()
1079                                            .map(|d| d.category.clone())
1080                                            .collect();
1081                                    }
1082                                    PromptInjectionResult::Warning { detections } => {
1083                                        // Set flag for response header
1084                                        ctx.guardrail_warning = true;
1085                                        ctx.guardrail_detection_categories = detections
1086                                            .iter()
1087                                            .map(|d| d.category.clone())
1088                                            .collect();
1089                                        debug!(
1090                                            correlation_id = %ctx.trace_id,
1091                                            "Prompt injection warning set"
1092                                        );
1093                                    }
1094                                    PromptInjectionResult::Clean => {
1095                                        trace!(
1096                                            correlation_id = %ctx.trace_id,
1097                                            "No prompt injection detected"
1098                                        );
1099                                    }
1100                                    PromptInjectionResult::Error { message } => {
1101                                        // Already logged in processor, just trace here
1102                                        trace!(
1103                                            correlation_id = %ctx.trace_id,
1104                                            error = %message,
1105                                            "Prompt injection check error (failure mode applied)"
1106                                        );
1107                                    }
1108                                }
1109                            }
1110                        }
1111                    }
1112                }
1113            }
1114        }
1115
1116        // Geo filtering
1117        if let Some(route_id) = ctx.route_id.as_deref() {
1118            if let Some(ref route_config) = ctx.route_config {
1119                for filter_id in &route_config.filters {
1120                    if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
1121                        // Store country code for response header
1122                        ctx.geo_country_code = result.country_code.clone();
1123                        ctx.geo_lookup_performed = true;
1124
1125                        if !result.allowed {
1126                            warn!(
1127                                correlation_id = %ctx.trace_id,
1128                                route_id = route_id,
1129                                client_ip = %ctx.client_ip,
1130                                country = ?result.country_code,
1131                                filter_id = %filter_id,
1132                                "Request blocked by geo filter"
1133                            );
1134                            self.metrics.record_blocked_request("geo_blocked");
1135
1136                            // Audit log the geo block
1137                            let audit_entry = AuditLogEntry::new(
1138                                &ctx.trace_id,
1139                                AuditEventType::Blocked,
1140                                &ctx.method,
1141                                &ctx.path,
1142                                &ctx.client_ip,
1143                            )
1144                            .with_route_id(route_id)
1145                            .with_status_code(result.status_code)
1146                            .with_reason(format!(
1147                                "Geo blocked: country={}, filter={}",
1148                                result.country_code.as_deref().unwrap_or("unknown"),
1149                                filter_id
1150                            ));
1151                            self.log_manager.log_audit(&audit_entry);
1152
1153                            // Send geo block response
1154                            let body = result
1155                                .block_message
1156                                .unwrap_or_else(|| "Access denied".to_string());
1157
1158                            crate::http_helpers::write_error(
1159                                session,
1160                                result.status_code,
1161                                &body,
1162                                "text/plain",
1163                            )
1164                            .await?;
1165                            return Ok(true); // Request complete, don't continue
1166                        }
1167
1168                        // Only check first geo filter that matches
1169                        break;
1170                    }
1171                }
1172            }
1173        }
1174
1175        // Check for WebSocket upgrade requests
1176        let is_websocket_upgrade = session
1177            .req_header()
1178            .headers
1179            .get(http::header::UPGRADE)
1180            .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
1181            .unwrap_or(false);
1182
1183        if is_websocket_upgrade {
1184            ctx.is_websocket_upgrade = true;
1185
1186            // Check if route allows WebSocket upgrades
1187            if let Some(ref route_config) = ctx.route_config {
1188                if !route_config.websocket {
1189                    warn!(
1190                        correlation_id = %ctx.trace_id,
1191                        route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1192                        client_ip = %ctx.client_ip,
1193                        "WebSocket upgrade rejected: not enabled for route"
1194                    );
1195
1196                    self.metrics.record_blocked_request("websocket_not_enabled");
1197
1198                    // Audit log the rejection
1199                    let audit_entry = AuditLogEntry::new(
1200                        &ctx.trace_id,
1201                        AuditEventType::Blocked,
1202                        &ctx.method,
1203                        &ctx.path,
1204                        &ctx.client_ip,
1205                    )
1206                    .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
1207                    .with_action("websocket_rejected")
1208                    .with_reason("WebSocket not enabled for route");
1209                    self.log_manager.log_audit(&audit_entry);
1210
1211                    // Send 403 Forbidden response
1212                    crate::http_helpers::write_error(
1213                        session,
1214                        403,
1215                        "WebSocket not enabled for this route",
1216                        "text/plain",
1217                    )
1218                    .await?;
1219                    return Ok(true); // Request complete, don't continue
1220                }
1221
1222                debug!(
1223                    correlation_id = %ctx.trace_id,
1224                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1225                    "WebSocket upgrade request allowed"
1226                );
1227
1228                // Check for WebSocket frame inspection
1229                if route_config.websocket_inspection {
1230                    // Check for compression negotiation - skip inspection if permessage-deflate
1231                    let has_compression = session
1232                        .req_header()
1233                        .headers
1234                        .get("Sec-WebSocket-Extensions")
1235                        .and_then(|v| v.to_str().ok())
1236                        .map(|s| s.contains("permessage-deflate"))
1237                        .unwrap_or(false);
1238
1239                    if has_compression {
1240                        debug!(
1241                            correlation_id = %ctx.trace_id,
1242                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1243                            "WebSocket inspection skipped: permessage-deflate negotiated"
1244                        );
1245                        ctx.websocket_skip_inspection = true;
1246                    } else {
1247                        ctx.websocket_inspection_enabled = true;
1248
1249                        // Get agents that handle WebSocketFrame events
1250                        ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
1251                            sentinel_agent_protocol::EventType::WebSocketFrame,
1252                        );
1253
1254                        debug!(
1255                            correlation_id = %ctx.trace_id,
1256                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1257                            agent_count = ctx.websocket_inspection_agents.len(),
1258                            "WebSocket frame inspection enabled"
1259                        );
1260                    }
1261                }
1262            }
1263        }
1264
1265        // Use cached route config from upstream_peer (avoids duplicate route matching)
1266        // Handle static file and builtin routes
1267        if let Some(route_config) = ctx.route_config.clone() {
1268            if route_config.service_type == sentinel_config::ServiceType::Static {
1269                trace!(
1270                    correlation_id = %ctx.trace_id,
1271                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1272                    "Handling static file route"
1273                );
1274                // Create a minimal RouteMatch for the handler
1275                let route_match = crate::routing::RouteMatch {
1276                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1277                    config: route_config.clone(),
1278                };
1279                return self.handle_static_route(session, ctx, &route_match).await;
1280            } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
1281                trace!(
1282                    correlation_id = %ctx.trace_id,
1283                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1284                    builtin_handler = ?route_config.builtin_handler,
1285                    "Handling builtin route"
1286                );
1287                // Create a minimal RouteMatch for the handler
1288                let route_match = crate::routing::RouteMatch {
1289                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1290                    config: route_config.clone(),
1291                };
1292                return self.handle_builtin_route(session, ctx, &route_match).await;
1293            }
1294        }
1295
1296        // API validation for API routes
1297        if let Some(route_id) = ctx.route_id.clone() {
1298            if let Some(validator) = self.validators.get(&route_id).await {
1299                trace!(
1300                    correlation_id = %ctx.trace_id,
1301                    route_id = %route_id,
1302                    "Running API schema validation"
1303                );
1304                if let Some(result) = self
1305                    .validate_api_request(session, ctx, &route_id, &validator)
1306                    .await?
1307                {
1308                    debug!(
1309                        correlation_id = %ctx.trace_id,
1310                        route_id = %route_id,
1311                        validation_passed = result,
1312                        "API validation complete"
1313                    );
1314                    return Ok(result);
1315                }
1316            }
1317        }
1318
1319        // Get client address before mutable borrow
1320        let client_addr = session
1321            .client_addr()
1322            .map(|a| format!("{}", a))
1323            .unwrap_or_else(|| "unknown".to_string());
1324        let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
1325
1326        let req_header = session.req_header_mut();
1327
1328        // Add correlation ID header
1329        req_header
1330            .insert_header("X-Correlation-Id", &ctx.trace_id)
1331            .ok();
1332        req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
1333
1334        // Use cached config (set in upstream_peer, or fetch now if needed)
1335        let config = ctx
1336            .config
1337            .get_or_insert_with(|| self.config_manager.current());
1338
1339        // Enforce header limits (fast path: skip if limits are very high)
1340        const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; // 1MB = effectively unlimited
1341
1342        // Header count check - O(1)
1343        let header_count = req_header.headers.len();
1344        if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
1345            && header_count > config.limits.max_header_count
1346        {
1347            warn!(
1348                correlation_id = %ctx.trace_id,
1349                header_count = header_count,
1350                limit = config.limits.max_header_count,
1351                "Request blocked: exceeds header count limit"
1352            );
1353
1354            self.metrics.record_blocked_request("header_count_exceeded");
1355            return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
1356        }
1357
1358        // Header size check - O(n), skip if limit is very high
1359        if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
1360            let total_header_size: usize = req_header
1361                .headers
1362                .iter()
1363                .map(|(k, v)| k.as_str().len() + v.len())
1364                .sum();
1365
1366            if total_header_size > config.limits.max_header_size_bytes {
1367                warn!(
1368                    correlation_id = %ctx.trace_id,
1369                    header_size = total_header_size,
1370                    limit = config.limits.max_header_size_bytes,
1371                    "Request blocked: exceeds header size limit"
1372                );
1373
1374                self.metrics.record_blocked_request("header_size_exceeded");
1375                return Err(Error::explain(
1376                    ErrorType::InternalError,
1377                    "Headers too large",
1378                ));
1379            }
1380        }
1381
1382        // Process through external agents
1383        trace!(
1384            correlation_id = %ctx.trace_id,
1385            "Processing request through agents"
1386        );
1387        if let Err(e) = self
1388            .process_agents(session, ctx, &client_addr, client_port)
1389            .await
1390        {
1391            // Check if this is an HTTPStatus error (e.g., agent block or fail-closed)
1392            // In that case, we need to send a proper HTTP response instead of just closing the connection
1393            if let ErrorType::HTTPStatus(status) = e.etype() {
1394                // Extract the message from the error (the context part after "HTTPStatus context:")
1395                let error_msg = e.to_string();
1396                let body = error_msg
1397                    .split("context:")
1398                    .nth(1)
1399                    .map(|s| s.trim())
1400                    .unwrap_or("Request blocked");
1401                debug!(
1402                    correlation_id = %ctx.trace_id,
1403                    status = status,
1404                    body = %body,
1405                    "Sending HTTP error response for agent block"
1406                );
1407                crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
1408                return Ok(true); // Request complete, don't continue to upstream
1409            }
1410            // For other errors, propagate them
1411            return Err(e);
1412        }
1413
1414        trace!(
1415            correlation_id = %ctx.trace_id,
1416            "Request filter phase complete, forwarding to upstream"
1417        );
1418
1419        Ok(false) // Continue processing
1420    }
1421
1422    /// Process incoming request body chunks.
1423    /// Used for body size enforcement and WAF/agent inspection.
1424    ///
1425    /// Supports two modes:
1426    /// - **Buffer mode** (default): Buffer chunks until end of stream or limit, then send to agents
1427    /// - **Stream mode**: Send each chunk immediately to agents as it arrives
1428    async fn request_body_filter(
1429        &self,
1430        _session: &mut Session,
1431        body: &mut Option<Bytes>,
1432        end_of_stream: bool,
1433        ctx: &mut Self::CTX,
1434    ) -> Result<(), Box<Error>> {
1435        use sentinel_config::BodyStreamingMode;
1436
1437        // Handle WebSocket frame inspection (client -> server)
1438        if ctx.is_websocket_upgrade {
1439            if let Some(ref handler) = ctx.websocket_handler {
1440                let result = handler.process_client_data(body.take()).await;
1441                match result {
1442                    crate::websocket::ProcessResult::Forward(data) => {
1443                        *body = data;
1444                    }
1445                    crate::websocket::ProcessResult::Close(reason) => {
1446                        warn!(
1447                            correlation_id = %ctx.trace_id,
1448                            code = reason.code,
1449                            reason = %reason.reason,
1450                            "WebSocket connection closed by agent (client->server)"
1451                        );
1452                        // Return an error to close the connection
1453                        return Err(Error::explain(
1454                            ErrorType::InternalError,
1455                            format!("WebSocket closed: {} {}", reason.code, reason.reason),
1456                        ));
1457                    }
1458                }
1459            }
1460            // Skip normal body processing for WebSocket
1461            return Ok(());
1462        }
1463
1464        // Track request body size
1465        let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
1466        if chunk_len > 0 {
1467            ctx.request_body_bytes += chunk_len as u64;
1468
1469            trace!(
1470                correlation_id = %ctx.trace_id,
1471                chunk_size = chunk_len,
1472                total_body_bytes = ctx.request_body_bytes,
1473                end_of_stream = end_of_stream,
1474                streaming_mode = ?ctx.request_body_streaming_mode,
1475                "Processing request body chunk"
1476            );
1477
1478            // Check body size limit (use cached config)
1479            let config = ctx
1480                .config
1481                .get_or_insert_with(|| self.config_manager.current());
1482            if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
1483                warn!(
1484                    correlation_id = %ctx.trace_id,
1485                    body_bytes = ctx.request_body_bytes,
1486                    limit = config.limits.max_body_size_bytes,
1487                    "Request body size limit exceeded"
1488                );
1489                self.metrics.record_blocked_request("body_size_exceeded");
1490                return Err(Error::explain(
1491                    ErrorType::InternalError,
1492                    "Request body too large",
1493                ));
1494            }
1495        }
1496
1497        // Body inspection for agents (WAF, etc.)
1498        if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
1499            let config = ctx
1500                .config
1501                .get_or_insert_with(|| self.config_manager.current());
1502            let max_inspection_bytes = config
1503                .waf
1504                .as_ref()
1505                .map(|w| w.body_inspection.max_inspection_bytes as u64)
1506                .unwrap_or(1024 * 1024);
1507
1508            match ctx.request_body_streaming_mode {
1509                BodyStreamingMode::Stream => {
1510                    // Stream mode: send each chunk immediately
1511                    if body.is_some() {
1512                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1513                            .await?;
1514                    } else if end_of_stream && ctx.agent_needs_more {
1515                        // Send final empty chunk to signal end
1516                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1517                            .await?;
1518                    }
1519                }
1520                BodyStreamingMode::Hybrid { buffer_threshold } => {
1521                    // Hybrid mode: buffer up to threshold, then stream
1522                    if ctx.body_bytes_inspected < buffer_threshold as u64 {
1523                        // Still in buffering phase
1524                        if let Some(ref chunk) = body {
1525                            let bytes_to_buffer = std::cmp::min(
1526                                chunk.len(),
1527                                (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
1528                            );
1529                            ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
1530                            ctx.body_bytes_inspected += bytes_to_buffer as u64;
1531
1532                            // If we've reached threshold or end of stream, switch to streaming
1533                            if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
1534                            {
1535                                // Send buffered content first
1536                                self.send_buffered_body_to_agents(
1537                                    end_of_stream && chunk.len() == bytes_to_buffer,
1538                                    ctx,
1539                                )
1540                                .await?;
1541                                ctx.body_buffer.clear();
1542
1543                                // If there's remaining data in this chunk, stream it
1544                                if bytes_to_buffer < chunk.len() {
1545                                    let remaining = chunk.slice(bytes_to_buffer..);
1546                                    let mut remaining_body = Some(remaining);
1547                                    self.process_body_chunk_streaming(
1548                                        &mut remaining_body,
1549                                        end_of_stream,
1550                                        ctx,
1551                                    )
1552                                    .await?;
1553                                }
1554                            }
1555                        }
1556                    } else {
1557                        // Past threshold, stream directly
1558                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1559                            .await?;
1560                    }
1561                }
1562                BodyStreamingMode::Buffer => {
1563                    // Buffer mode: collect chunks until ready to send
1564                    if let Some(ref chunk) = body {
1565                        if ctx.body_bytes_inspected < max_inspection_bytes {
1566                            let bytes_to_inspect = std::cmp::min(
1567                                chunk.len() as u64,
1568                                max_inspection_bytes - ctx.body_bytes_inspected,
1569                            ) as usize;
1570
1571                            ctx.body_buffer
1572                                .extend_from_slice(&chunk[..bytes_to_inspect]);
1573                            ctx.body_bytes_inspected += bytes_to_inspect as u64;
1574
1575                            trace!(
1576                                correlation_id = %ctx.trace_id,
1577                                bytes_inspected = ctx.body_bytes_inspected,
1578                                max_inspection_bytes = max_inspection_bytes,
1579                                buffer_size = ctx.body_buffer.len(),
1580                                "Buffering body for agent inspection"
1581                            );
1582                        }
1583                    }
1584
1585                    // Send when complete or limit reached
1586                    let should_send =
1587                        end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1588                    if should_send && !ctx.body_buffer.is_empty() {
1589                        self.send_buffered_body_to_agents(end_of_stream, ctx)
1590                            .await?;
1591                        ctx.body_buffer.clear();
1592                    }
1593                }
1594            }
1595        }
1596
1597        if end_of_stream {
1598            trace!(
1599                correlation_id = %ctx.trace_id,
1600                total_body_bytes = ctx.request_body_bytes,
1601                bytes_inspected = ctx.body_bytes_inspected,
1602                "Request body complete"
1603            );
1604        }
1605
1606        Ok(())
1607    }
1608
1609    async fn response_filter(
1610        &self,
1611        _session: &mut Session,
1612        upstream_response: &mut ResponseHeader,
1613        ctx: &mut Self::CTX,
1614    ) -> Result<(), Box<Error>> {
1615        let status = upstream_response.status.as_u16();
1616        let duration = ctx.elapsed();
1617
1618        trace!(
1619            correlation_id = %ctx.trace_id,
1620            status = status,
1621            "Starting response filter phase"
1622        );
1623
1624        // Handle WebSocket 101 Switching Protocols
1625        if status == 101 && ctx.is_websocket_upgrade {
1626            if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1627                // Create WebSocket inspector and handler with metrics
1628                let inspector = crate::websocket::WebSocketInspector::with_metrics(
1629                    self.agent_manager.clone(),
1630                    ctx.route_id
1631                        .clone()
1632                        .unwrap_or_else(|| "unknown".to_string()),
1633                    ctx.trace_id.clone(),
1634                    ctx.client_ip.clone(),
1635                    100, // 100ms timeout per frame inspection
1636                    Some(self.metrics.clone()),
1637                );
1638
1639                let handler = crate::websocket::WebSocketHandler::new(
1640                    std::sync::Arc::new(inspector),
1641                    1024 * 1024, // 1MB max frame size
1642                );
1643
1644                ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1645
1646                info!(
1647                    correlation_id = %ctx.trace_id,
1648                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1649                    agent_count = ctx.websocket_inspection_agents.len(),
1650                    "WebSocket upgrade successful, frame inspection enabled"
1651                );
1652            } else if ctx.websocket_skip_inspection {
1653                debug!(
1654                    correlation_id = %ctx.trace_id,
1655                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1656                    "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1657                );
1658            } else {
1659                debug!(
1660                    correlation_id = %ctx.trace_id,
1661                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1662                    "WebSocket upgrade successful"
1663                );
1664            }
1665        }
1666
1667        // Apply security headers
1668        trace!(
1669            correlation_id = %ctx.trace_id,
1670            "Applying security headers"
1671        );
1672        self.apply_security_headers(upstream_response).ok();
1673
1674        // Add correlation ID to response
1675        upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1676
1677        // Add rate limit headers if rate limiting was applied
1678        if let Some(ref rate_info) = ctx.rate_limit_info {
1679            upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1680            upstream_response
1681                .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1682            upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1683        }
1684
1685        // Add token budget headers if budget tracking was enabled
1686        if ctx.inference_budget_enabled {
1687            if let Some(remaining) = ctx.inference_budget_remaining {
1688                upstream_response.insert_header("X-Budget-Remaining", remaining.to_string())?;
1689            }
1690            if let Some(period_reset) = ctx.inference_budget_period_reset {
1691                // Format as ISO 8601 timestamp
1692                let reset_datetime = chrono::DateTime::from_timestamp(period_reset as i64, 0)
1693                    .map(|dt| dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1694                    .unwrap_or_else(|| period_reset.to_string());
1695                upstream_response.insert_header("X-Budget-Period-Reset", reset_datetime)?;
1696            }
1697        }
1698
1699        // Add GeoIP country header if geo lookup was performed
1700        if let Some(ref country_code) = ctx.geo_country_code {
1701            upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1702        }
1703
1704        // Add sticky session cookie if a new assignment was made
1705        if ctx.sticky_session_new_assignment {
1706            if let Some(ref set_cookie_header) = ctx.sticky_session_set_cookie {
1707                upstream_response.insert_header("Set-Cookie", set_cookie_header)?;
1708                trace!(
1709                    correlation_id = %ctx.trace_id,
1710                    sticky_target_index = ?ctx.sticky_target_index,
1711                    "Added sticky session Set-Cookie header"
1712                );
1713            }
1714        }
1715
1716        // Add guardrail warning header if prompt injection was detected (warn mode)
1717        if ctx.guardrail_warning {
1718            upstream_response.insert_header("X-Guardrail-Warning", "prompt-injection-detected")?;
1719        }
1720
1721        // Add fallback routing headers if fallback was used
1722        if ctx.used_fallback() {
1723            upstream_response.insert_header("X-Fallback-Used", "true")?;
1724
1725            if let Some(ref upstream) = ctx.upstream {
1726                upstream_response.insert_header("X-Fallback-Upstream", upstream)?;
1727            }
1728
1729            if let Some(ref reason) = ctx.fallback_reason {
1730                upstream_response.insert_header("X-Fallback-Reason", reason.to_string())?;
1731            }
1732
1733            if let Some(ref original) = ctx.original_upstream {
1734                upstream_response.insert_header("X-Original-Upstream", original)?;
1735            }
1736
1737            if let Some(ref mapping) = ctx.model_mapping_applied {
1738                upstream_response.insert_header(
1739                    "X-Model-Mapping",
1740                    format!("{} -> {}", mapping.0, mapping.1),
1741                )?;
1742            }
1743
1744            trace!(
1745                correlation_id = %ctx.trace_id,
1746                fallback_attempt = ctx.fallback_attempt,
1747                fallback_upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1748                original_upstream = ctx.original_upstream.as_deref().unwrap_or("unknown"),
1749                "Added fallback response headers"
1750            );
1751
1752            // Record fallback success metrics for successful responses (2xx/3xx)
1753            if status < 400 {
1754                if let Some(metrics) = get_fallback_metrics() {
1755                    metrics.record_fallback_success(
1756                        ctx.route_id.as_deref().unwrap_or("unknown"),
1757                        ctx.upstream.as_deref().unwrap_or("unknown"),
1758                    );
1759                }
1760            }
1761        }
1762
1763        // Initialize streaming token counter for SSE responses on inference routes
1764        if ctx.inference_rate_limit_enabled {
1765            // Check if this is an SSE response
1766            let content_type = upstream_response
1767                .headers
1768                .get("content-type")
1769                .and_then(|ct| ct.to_str().ok());
1770
1771            if is_sse_response(content_type) {
1772                // Get provider from route config
1773                let provider = ctx
1774                    .route_config
1775                    .as_ref()
1776                    .and_then(|r| r.inference.as_ref())
1777                    .map(|i| i.provider.clone())
1778                    .unwrap_or_default();
1779
1780                ctx.inference_streaming_response = true;
1781                ctx.inference_streaming_counter = Some(StreamingTokenCounter::new(
1782                    provider,
1783                    ctx.inference_model.clone(),
1784                ));
1785
1786                trace!(
1787                    correlation_id = %ctx.trace_id,
1788                    content_type = ?content_type,
1789                    model = ?ctx.inference_model,
1790                    "Initialized streaming token counter for SSE response"
1791                );
1792            }
1793        }
1794
1795        // Generate custom error pages for error responses
1796        if status >= 400 {
1797            trace!(
1798                correlation_id = %ctx.trace_id,
1799                status = status,
1800                "Handling error response"
1801            );
1802            self.handle_error_response(upstream_response, ctx).await?;
1803        }
1804
1805        // Record metrics
1806        self.metrics.record_request(
1807            ctx.route_id.as_deref().unwrap_or("unknown"),
1808            &ctx.method,
1809            status,
1810            duration,
1811        );
1812
1813        // Record OpenTelemetry span status
1814        if let Some(ref mut span) = ctx.otel_span {
1815            span.set_status(status);
1816            if let Some(ref upstream) = ctx.upstream {
1817                span.set_upstream(upstream, "");
1818            }
1819            if status >= 500 {
1820                span.record_error(&format!("HTTP {}", status));
1821            }
1822        }
1823
1824        // Record passive health check
1825        if let Some(ref upstream) = ctx.upstream {
1826            let success = status < 500;
1827
1828            trace!(
1829                correlation_id = %ctx.trace_id,
1830                upstream = %upstream,
1831                success = success,
1832                status = status,
1833                "Recording passive health check result"
1834            );
1835
1836            let error_msg = if !success {
1837                Some(format!("HTTP {}", status))
1838            } else {
1839                None
1840            };
1841            self.passive_health
1842                .record_outcome(upstream, success, error_msg.as_deref())
1843                .await;
1844
1845            // Report to upstream pool
1846            if let Some(pool) = self.upstream_pools.get(upstream).await {
1847                pool.report_result(upstream, success).await;
1848            }
1849
1850            if !success {
1851                warn!(
1852                    correlation_id = %ctx.trace_id,
1853                    upstream = %upstream,
1854                    status = status,
1855                    "Upstream returned error status"
1856                );
1857            }
1858        }
1859
1860        // Final request completion log
1861        if status >= 500 {
1862            error!(
1863                correlation_id = %ctx.trace_id,
1864                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1865                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1866                method = %ctx.method,
1867                path = %ctx.path,
1868                status = status,
1869                duration_ms = duration.as_millis(),
1870                attempts = ctx.upstream_attempts,
1871                "Request completed with server error"
1872            );
1873        } else if status >= 400 {
1874            warn!(
1875                correlation_id = %ctx.trace_id,
1876                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1877                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1878                method = %ctx.method,
1879                path = %ctx.path,
1880                status = status,
1881                duration_ms = duration.as_millis(),
1882                "Request completed with client error"
1883            );
1884        } else {
1885            debug!(
1886                correlation_id = %ctx.trace_id,
1887                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1888                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1889                method = %ctx.method,
1890                path = %ctx.path,
1891                status = status,
1892                duration_ms = duration.as_millis(),
1893                attempts = ctx.upstream_attempts,
1894                "Request completed"
1895            );
1896        }
1897
1898        Ok(())
1899    }
1900
1901    /// Modify the request before sending to upstream.
1902    /// Used for header modifications, adding authentication, etc.
1903    async fn upstream_request_filter(
1904        &self,
1905        _session: &mut Session,
1906        upstream_request: &mut pingora::http::RequestHeader,
1907        ctx: &mut Self::CTX,
1908    ) -> Result<()>
1909    where
1910        Self::CTX: Send + Sync,
1911    {
1912        trace!(
1913            correlation_id = %ctx.trace_id,
1914            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1915            "Applying upstream request modifications"
1916        );
1917
1918        // Add trace ID header for upstream correlation
1919        upstream_request
1920            .insert_header("X-Trace-Id", &ctx.trace_id)
1921            .ok();
1922
1923        // Add W3C traceparent header for distributed tracing
1924        if let Some(ref span) = ctx.otel_span {
1925            let sampled = ctx.trace_context.as_ref().map(|c| c.sampled).unwrap_or(true);
1926            let traceparent =
1927                crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled);
1928            upstream_request
1929                .insert_header(crate::otel::TRACEPARENT_HEADER, &traceparent)
1930                .ok();
1931        }
1932
1933        // Add request metadata headers
1934        upstream_request
1935            .insert_header("X-Forwarded-By", "Sentinel")
1936            .ok();
1937
1938        // Apply route-specific request header modifications
1939        // Note: Pingora's IntoCaseHeaderName requires owned String for header names,
1940        // so we clone names but pass values by reference to avoid cloning both.
1941        if let Some(ref route_config) = ctx.route_config {
1942            let mods = &route_config.policies.request_headers;
1943
1944            // Set headers (overwrite existing)
1945            for (name, value) in &mods.set {
1946                upstream_request.insert_header(name.clone(), value.as_str()).ok();
1947            }
1948
1949            // Add headers (append)
1950            for (name, value) in &mods.add {
1951                upstream_request.append_header(name.clone(), value.as_str()).ok();
1952            }
1953
1954            // Remove headers
1955            for name in &mods.remove {
1956                upstream_request.remove_header(name);
1957            }
1958
1959            trace!(
1960                correlation_id = %ctx.trace_id,
1961                "Applied request header modifications"
1962            );
1963        }
1964
1965        // Remove sensitive headers that shouldn't go to upstream
1966        upstream_request.remove_header("X-Internal-Token");
1967        upstream_request.remove_header("Authorization-Internal");
1968
1969        // === Traffic Mirroring / Shadowing ===
1970        // Check if this route has shadow configuration
1971        if let Some(ref route_config) = ctx.route_config {
1972            if let Some(ref shadow_config) = route_config.shadow {
1973                // Get snapshot of upstream pools for shadow manager
1974                let pools_snapshot = self.upstream_pools.snapshot().await;
1975                let upstream_pools = std::sync::Arc::new(pools_snapshot);
1976
1977                // Get route ID for metrics labeling
1978                let route_id = ctx.route_id.clone().unwrap_or_else(|| "unknown".to_string());
1979
1980                // Create shadow manager
1981                let shadow_manager = crate::shadow::ShadowManager::new(
1982                    upstream_pools,
1983                    shadow_config.clone(),
1984                    Some(std::sync::Arc::clone(&self.metrics)),
1985                    route_id,
1986                );
1987
1988                // Check if we should shadow this request (sampling + header check)
1989                if shadow_manager.should_shadow(upstream_request) {
1990                    trace!(
1991                        correlation_id = %ctx.trace_id,
1992                        shadow_upstream = %shadow_config.upstream,
1993                        percentage = shadow_config.percentage,
1994                        "Shadowing request"
1995                    );
1996
1997                    // Clone headers for shadow request
1998                    let shadow_headers = upstream_request.clone();
1999
2000                    // Create request context for shadow (simplified from proxy context)
2001                    let shadow_ctx = crate::upstream::RequestContext {
2002                        client_ip: ctx.client_ip.parse().ok(),
2003                        headers: std::collections::HashMap::new(), // Empty for now
2004                        path: ctx.path.clone(),
2005                        method: ctx.method.clone(),
2006                    };
2007
2008                    // Determine if we should buffer the body
2009                    let buffer_body = shadow_config.buffer_body
2010                        && crate::shadow::should_buffer_method(&ctx.method);
2011
2012                    if buffer_body {
2013                        // Body buffering requested - defer shadow request until body is available
2014                        // Store shadow info in context; will be fired in logging phase
2015                        // or when request body filter completes
2016                        trace!(
2017                            correlation_id = %ctx.trace_id,
2018                            "Deferring shadow request until body is buffered"
2019                        );
2020                        ctx.shadow_pending = Some(crate::proxy::context::ShadowPendingRequest {
2021                            headers: shadow_headers,
2022                            manager: std::sync::Arc::new(shadow_manager),
2023                            request_ctx: shadow_ctx,
2024                            include_body: true,
2025                        });
2026                        // Enable body inspection to capture the body for shadow
2027                        // (only if not already enabled for other reasons)
2028                        if !ctx.body_inspection_enabled {
2029                            ctx.body_inspection_enabled = true;
2030                            // Set a reasonable buffer limit from shadow config
2031                            // (body_buffer will accumulate chunks)
2032                        }
2033                    } else {
2034                        // No body buffering needed - fire shadow request immediately
2035                        shadow_manager.shadow_request(shadow_headers, None, shadow_ctx);
2036                        ctx.shadow_sent = true;
2037                    }
2038                }
2039            }
2040        }
2041
2042        Ok(())
2043    }
2044
2045    /// Process response body chunks from upstream.
2046    /// Used for response size tracking and WAF inspection.
2047    ///
2048    /// Note: Response body inspection is currently buffered only (streaming mode not supported
2049    /// for responses due to Pingora's synchronous filter design).
2050    fn response_body_filter(
2051        &self,
2052        _session: &mut Session,
2053        body: &mut Option<Bytes>,
2054        end_of_stream: bool,
2055        ctx: &mut Self::CTX,
2056    ) -> Result<Option<Duration>, Box<Error>> {
2057        // Handle WebSocket frame inspection (server -> client)
2058        // Note: This filter is synchronous, so we use block_in_place for async agent calls
2059        if ctx.is_websocket_upgrade {
2060            if let Some(ref handler) = ctx.websocket_handler {
2061                let handler = handler.clone();
2062                let data = body.take();
2063
2064                // Use block_in_place to run async handler from sync context
2065                // This is safe because Pingora uses a multi-threaded tokio runtime
2066                let result = tokio::task::block_in_place(|| {
2067                    tokio::runtime::Handle::current()
2068                        .block_on(async { handler.process_server_data(data).await })
2069                });
2070
2071                match result {
2072                    crate::websocket::ProcessResult::Forward(data) => {
2073                        *body = data;
2074                    }
2075                    crate::websocket::ProcessResult::Close(reason) => {
2076                        warn!(
2077                            correlation_id = %ctx.trace_id,
2078                            code = reason.code,
2079                            reason = %reason.reason,
2080                            "WebSocket connection closed by agent (server->client)"
2081                        );
2082                        // For sync filter, we can't return an error that closes the connection
2083                        // Instead, inject a close frame
2084                        let close_frame =
2085                            crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
2086                        let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
2087                        if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
2088                            *body = Some(Bytes::from(encoded));
2089                        }
2090                    }
2091                }
2092            }
2093            // Skip normal body processing for WebSocket
2094            return Ok(None);
2095        }
2096
2097        // Track response body size
2098        if let Some(ref chunk) = body {
2099            ctx.response_bytes += chunk.len() as u64;
2100
2101            trace!(
2102                correlation_id = %ctx.trace_id,
2103                chunk_size = chunk.len(),
2104                total_response_bytes = ctx.response_bytes,
2105                end_of_stream = end_of_stream,
2106                "Processing response body chunk"
2107            );
2108
2109            // Process SSE chunks for streaming token counting
2110            if let Some(ref mut counter) = ctx.inference_streaming_counter {
2111                let result = counter.process_chunk(chunk);
2112
2113                if result.content.is_some() || result.is_done {
2114                    trace!(
2115                        correlation_id = %ctx.trace_id,
2116                        has_content = result.content.is_some(),
2117                        is_done = result.is_done,
2118                        chunks_processed = counter.chunks_processed(),
2119                        accumulated_content_len = counter.content().len(),
2120                        "Processed SSE chunk for token counting"
2121                    );
2122                }
2123            }
2124
2125            // Response body inspection (buffered mode only)
2126            // Note: Streaming mode for response bodies is not currently supported
2127            // due to Pingora's synchronous response_body_filter design
2128            if ctx.response_body_inspection_enabled
2129                && !ctx.response_body_inspection_agents.is_empty()
2130            {
2131                let config = ctx
2132                    .config
2133                    .get_or_insert_with(|| self.config_manager.current());
2134                let max_inspection_bytes = config
2135                    .waf
2136                    .as_ref()
2137                    .map(|w| w.body_inspection.max_inspection_bytes as u64)
2138                    .unwrap_or(1024 * 1024);
2139
2140                if ctx.response_body_bytes_inspected < max_inspection_bytes {
2141                    let bytes_to_inspect = std::cmp::min(
2142                        chunk.len() as u64,
2143                        max_inspection_bytes - ctx.response_body_bytes_inspected,
2144                    ) as usize;
2145
2146                    // Buffer for later processing (during logging phase)
2147                    // Response body inspection happens asynchronously and results
2148                    // are logged rather than blocking the response
2149                    ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
2150                    ctx.response_body_chunk_index += 1;
2151
2152                    trace!(
2153                        correlation_id = %ctx.trace_id,
2154                        bytes_inspected = ctx.response_body_bytes_inspected,
2155                        max_inspection_bytes = max_inspection_bytes,
2156                        chunk_index = ctx.response_body_chunk_index,
2157                        "Tracking response body for inspection"
2158                    );
2159                }
2160            }
2161        }
2162
2163        if end_of_stream {
2164            trace!(
2165                correlation_id = %ctx.trace_id,
2166                total_response_bytes = ctx.response_bytes,
2167                response_bytes_inspected = ctx.response_body_bytes_inspected,
2168                "Response body complete"
2169            );
2170        }
2171
2172        // Return None to indicate no delay needed
2173        Ok(None)
2174    }
2175
2176    /// Called when a connection to upstream is established or reused.
2177    /// Logs connection reuse statistics for observability.
2178    async fn connected_to_upstream(
2179        &self,
2180        _session: &mut Session,
2181        reused: bool,
2182        peer: &HttpPeer,
2183        #[cfg(unix)] _fd: RawFd,
2184        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
2185        digest: Option<&Digest>,
2186        ctx: &mut Self::CTX,
2187    ) -> Result<(), Box<Error>> {
2188        // Track connection reuse for metrics
2189        ctx.connection_reused = reused;
2190
2191        // Log connection establishment/reuse
2192        if reused {
2193            trace!(
2194                correlation_id = %ctx.trace_id,
2195                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2196                peer_address = %peer.address(),
2197                "Reusing existing upstream connection"
2198            );
2199        } else {
2200            debug!(
2201                correlation_id = %ctx.trace_id,
2202                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2203                peer_address = %peer.address(),
2204                ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
2205                "Established new upstream connection"
2206            );
2207        }
2208
2209        Ok(())
2210    }
2211
2212    // =========================================================================
2213    // HTTP Caching - Pingora Cache Integration
2214    // =========================================================================
2215
2216    /// Decide if the request should use caching.
2217    ///
2218    /// This method is called early in the request lifecycle to determine if
2219    /// the response should be served from cache or if the response should
2220    /// be cached.
2221    fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
2222        // Check if route has caching enabled
2223        let route_id = match ctx.route_id.as_deref() {
2224            Some(id) => id,
2225            None => {
2226                trace!(
2227                    correlation_id = %ctx.trace_id,
2228                    "Cache filter: no route ID, skipping cache"
2229                );
2230                return Ok(());
2231            }
2232        };
2233
2234        // Check if caching is enabled for this route
2235        if !self.cache_manager.is_enabled(route_id) {
2236            trace!(
2237                correlation_id = %ctx.trace_id,
2238                route_id = %route_id,
2239                "Cache disabled for route"
2240            );
2241            return Ok(());
2242        }
2243
2244        // Check if method is cacheable (typically GET/HEAD)
2245        if !self
2246            .cache_manager
2247            .is_method_cacheable(route_id, &ctx.method)
2248        {
2249            trace!(
2250                correlation_id = %ctx.trace_id,
2251                route_id = %route_id,
2252                method = %ctx.method,
2253                "Method not cacheable"
2254            );
2255            return Ok(());
2256        }
2257
2258        // Enable caching for this request using Pingora's cache infrastructure
2259        debug!(
2260            correlation_id = %ctx.trace_id,
2261            route_id = %route_id,
2262            method = %ctx.method,
2263            path = %ctx.path,
2264            "Enabling HTTP caching for request"
2265        );
2266
2267        // Get static references to cache infrastructure
2268        let storage = get_cache_storage();
2269        let eviction = get_cache_eviction();
2270        let cache_lock = get_cache_lock();
2271
2272        // Enable the cache with storage, eviction, and lock
2273        session.cache.enable(
2274            storage,
2275            Some(eviction),
2276            None, // predictor - optional
2277            Some(cache_lock),
2278            None, // option overrides
2279        );
2280
2281        // Mark request as cache-eligible in context
2282        ctx.cache_eligible = true;
2283
2284        trace!(
2285            correlation_id = %ctx.trace_id,
2286            route_id = %route_id,
2287            cache_enabled = session.cache.enabled(),
2288            "Cache enabled for request"
2289        );
2290
2291        Ok(())
2292    }
2293
2294    /// Generate the cache key for this request.
2295    ///
2296    /// The cache key uniquely identifies the cached response. It typically
2297    /// includes the method, host, path, and potentially query parameters.
2298    fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
2299        let req_header = session.req_header();
2300        let method = req_header.method.as_str();
2301        let path = req_header.uri.path();
2302        let host = ctx.host.as_deref().unwrap_or("unknown");
2303        let query = req_header.uri.query();
2304
2305        // Generate cache key using our cache manager
2306        let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2307
2308        trace!(
2309            correlation_id = %ctx.trace_id,
2310            cache_key = %key_string,
2311            "Generated cache key"
2312        );
2313
2314        // Use Pingora's default cache key generator which handles
2315        // proper hashing and internal format
2316        Ok(CacheKey::default(req_header))
2317    }
2318
2319    /// Called when a cache miss occurs.
2320    ///
2321    /// This is called when the cache lookup found no matching entry.
2322    /// We can use this to log and track cache misses.
2323    fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
2324        // Let Pingora handle the cache miss
2325        session.cache.cache_miss();
2326
2327        // Track statistics
2328        if let Some(route_id) = ctx.route_id.as_deref() {
2329            self.cache_manager.stats().record_miss();
2330
2331            trace!(
2332                correlation_id = %ctx.trace_id,
2333                route_id = %route_id,
2334                path = %ctx.path,
2335                "Cache miss"
2336            );
2337        }
2338    }
2339
2340    /// Called after a successful cache lookup.
2341    ///
2342    /// This filter allows inspecting the cached response before serving it.
2343    /// Returns `None` to serve the cached response, or a `ForcedFreshness`
2344    /// to override the freshness decision.
2345    async fn cache_hit_filter(
2346        &self,
2347        session: &mut Session,
2348        meta: &CacheMeta,
2349        _hit_handler: &mut HitHandler,
2350        is_fresh: bool,
2351        ctx: &mut Self::CTX,
2352    ) -> Result<Option<ForcedFreshness>>
2353    where
2354        Self::CTX: Send + Sync,
2355    {
2356        // Check if this cache entry should be invalidated due to a purge request
2357        let req_header = session.req_header();
2358        let method = req_header.method.as_str();
2359        let path = req_header.uri.path();
2360        let host = req_header.uri.host().unwrap_or("localhost");
2361        let query = req_header.uri.query();
2362
2363        // Generate the cache key for this request
2364        let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2365
2366        // Check if this key should be invalidated
2367        if self.cache_manager.should_invalidate(&cache_key) {
2368            info!(
2369                correlation_id = %ctx.trace_id,
2370                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2371                cache_key = %cache_key,
2372                "Cache entry invalidated by purge request"
2373            );
2374            // Force expiration so the entry is refetched from upstream
2375            return Ok(Some(ForcedFreshness::ForceExpired));
2376        }
2377
2378        // Track cache hit statistics
2379        if is_fresh {
2380            self.cache_manager.stats().record_hit();
2381
2382            debug!(
2383                correlation_id = %ctx.trace_id,
2384                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2385                is_fresh = is_fresh,
2386                "Cache hit (fresh)"
2387            );
2388        } else {
2389            trace!(
2390                correlation_id = %ctx.trace_id,
2391                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2392                is_fresh = is_fresh,
2393                "Cache hit (stale)"
2394            );
2395        }
2396
2397        // Serve the cached response without invalidation
2398        Ok(None)
2399    }
2400
2401    /// Decide if the response should be cached.
2402    ///
2403    /// Called after receiving the response from upstream to determine
2404    /// if it should be stored in the cache.
2405    fn response_cache_filter(
2406        &self,
2407        _session: &Session,
2408        resp: &ResponseHeader,
2409        ctx: &mut Self::CTX,
2410    ) -> Result<RespCacheable> {
2411        let route_id = match ctx.route_id.as_deref() {
2412            Some(id) => id,
2413            None => {
2414                return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2415                    "no_route",
2416                )));
2417            }
2418        };
2419
2420        // Check if caching is enabled for this route
2421        if !self.cache_manager.is_enabled(route_id) {
2422            return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2423                "disabled",
2424            )));
2425        }
2426
2427        let status = resp.status.as_u16();
2428
2429        // Check if status code is cacheable
2430        if !self.cache_manager.is_status_cacheable(route_id, status) {
2431            trace!(
2432                correlation_id = %ctx.trace_id,
2433                route_id = %route_id,
2434                status = status,
2435                "Status code not cacheable"
2436            );
2437            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2438        }
2439
2440        // Check Cache-Control header for no-store, no-cache, private
2441        if let Some(cache_control) = resp.headers.get("cache-control") {
2442            if let Ok(cc_str) = cache_control.to_str() {
2443                if crate::cache::CacheManager::is_no_cache(cc_str) {
2444                    trace!(
2445                        correlation_id = %ctx.trace_id,
2446                        route_id = %route_id,
2447                        cache_control = %cc_str,
2448                        "Response has no-cache directive"
2449                    );
2450                    return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2451                }
2452            }
2453        }
2454
2455        // Calculate TTL from Cache-Control or use default
2456        let cache_control = resp
2457            .headers
2458            .get("cache-control")
2459            .and_then(|v| v.to_str().ok());
2460        let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
2461
2462        if ttl.is_zero() {
2463            trace!(
2464                correlation_id = %ctx.trace_id,
2465                route_id = %route_id,
2466                "TTL is zero, not caching"
2467            );
2468            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2469        }
2470
2471        // Get route cache config for stale settings
2472        let config = self
2473            .cache_manager
2474            .get_route_config(route_id)
2475            .unwrap_or_default();
2476
2477        // Create timestamps for cache metadata
2478        let now = std::time::SystemTime::now();
2479        let fresh_until = now + ttl;
2480
2481        // Clone the response header for storage
2482        let header = resp.clone();
2483
2484        // Create CacheMeta with proper timestamps and TTLs
2485        let cache_meta = CacheMeta::new(
2486            fresh_until,
2487            now,
2488            config.stale_while_revalidate_secs as u32,
2489            config.stale_if_error_secs as u32,
2490            header,
2491        );
2492
2493        // Track the cache store
2494        self.cache_manager.stats().record_store();
2495
2496        debug!(
2497            correlation_id = %ctx.trace_id,
2498            route_id = %route_id,
2499            status = status,
2500            ttl_secs = ttl.as_secs(),
2501            stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2502            stale_if_error_secs = config.stale_if_error_secs,
2503            "Caching response"
2504        );
2505
2506        Ok(RespCacheable::Cacheable(cache_meta))
2507    }
2508
2509    /// Decide whether to serve stale content on error or during revalidation.
2510    ///
2511    /// This implements stale-while-revalidate and stale-if-error semantics.
2512    fn should_serve_stale(
2513        &self,
2514        _session: &mut Session,
2515        ctx: &mut Self::CTX,
2516        error: Option<&Error>,
2517    ) -> bool {
2518        let route_id = match ctx.route_id.as_deref() {
2519            Some(id) => id,
2520            None => return false,
2521        };
2522
2523        // Get route cache config for stale settings
2524        let config = match self.cache_manager.get_route_config(route_id) {
2525            Some(c) => c,
2526            None => return false,
2527        };
2528
2529        // If there's an upstream error, use stale-if-error
2530        if let Some(e) = error {
2531            // Only serve stale for upstream errors
2532            if e.esource() == &pingora::ErrorSource::Upstream {
2533                debug!(
2534                    correlation_id = %ctx.trace_id,
2535                    route_id = %route_id,
2536                    error = %e,
2537                    stale_if_error_secs = config.stale_if_error_secs,
2538                    "Considering stale-if-error"
2539                );
2540                return config.stale_if_error_secs > 0;
2541            }
2542        }
2543
2544        // During stale-while-revalidate (error is None)
2545        if error.is_none() && config.stale_while_revalidate_secs > 0 {
2546            trace!(
2547                correlation_id = %ctx.trace_id,
2548                route_id = %route_id,
2549                stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2550                "Allowing stale-while-revalidate"
2551            );
2552            return true;
2553        }
2554
2555        false
2556    }
2557
2558    /// Handle Range header for byte-range requests (streaming support).
2559    ///
2560    /// This method is called when a Range header is present in the request.
2561    /// It allows proper handling of:
2562    /// - Video streaming (HTML5 video seeking)
2563    /// - Large file downloads with resume support
2564    /// - Partial content delivery
2565    ///
2566    /// Uses Pingora's built-in range handling with route-specific logging.
2567    fn range_header_filter(
2568        &self,
2569        session: &mut Session,
2570        response: &mut ResponseHeader,
2571        ctx: &mut Self::CTX,
2572    ) -> pingora_proxy::RangeType
2573    where
2574        Self::CTX: Send + Sync,
2575    {
2576        // Check if route supports range requests
2577        let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
2578            // Static file routes and media routes should support range requests
2579            matches!(
2580                config.service_type,
2581                sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
2582            )
2583        });
2584
2585        if !supports_range {
2586            trace!(
2587                correlation_id = %ctx.trace_id,
2588                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2589                "Range request not supported for this route type"
2590            );
2591            return pingora_proxy::RangeType::None;
2592        }
2593
2594        // Use Pingora's built-in range header parsing and handling
2595        let range_type = pingora_proxy::range_header_filter(session.req_header(), response, None);
2596
2597        match &range_type {
2598            pingora_proxy::RangeType::None => {
2599                trace!(
2600                    correlation_id = %ctx.trace_id,
2601                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2602                    "No range request or not applicable"
2603                );
2604            }
2605            pingora_proxy::RangeType::Single(range) => {
2606                trace!(
2607                    correlation_id = %ctx.trace_id,
2608                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2609                    range_start = range.start,
2610                    range_end = range.end,
2611                    "Processing single-range request"
2612                );
2613            }
2614            pingora_proxy::RangeType::Multi(multi) => {
2615                trace!(
2616                    correlation_id = %ctx.trace_id,
2617                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2618                    range_count = multi.ranges.len(),
2619                    "Processing multi-range request"
2620                );
2621            }
2622            pingora_proxy::RangeType::Invalid => {
2623                debug!(
2624                    correlation_id = %ctx.trace_id,
2625                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2626                    "Invalid range header"
2627                );
2628            }
2629        }
2630
2631        range_type
2632    }
2633
2634    /// Handle fatal proxy errors by generating custom error pages.
2635    /// Called when the proxy itself fails to process the request.
2636    async fn fail_to_proxy(
2637        &self,
2638        session: &mut Session,
2639        e: &Error,
2640        ctx: &mut Self::CTX,
2641    ) -> pingora_proxy::FailToProxy
2642    where
2643        Self::CTX: Send + Sync,
2644    {
2645        let error_code = match e.etype() {
2646            // Connection errors
2647            ErrorType::ConnectRefused => 503,
2648            ErrorType::ConnectTimedout => 504,
2649            ErrorType::ConnectNoRoute => 502,
2650
2651            // Timeout errors
2652            ErrorType::ReadTimedout => 504,
2653            ErrorType::WriteTimedout => 504,
2654
2655            // TLS errors
2656            ErrorType::TLSHandshakeFailure => 502,
2657            ErrorType::InvalidCert => 502,
2658
2659            // Protocol errors
2660            ErrorType::InvalidHTTPHeader => 400,
2661            ErrorType::H2Error => 502,
2662
2663            // Resource errors
2664            ErrorType::ConnectProxyFailure => 502,
2665            ErrorType::ConnectionClosed => 502,
2666
2667            // Explicit HTTP status (e.g., from agent fail-closed blocking)
2668            ErrorType::HTTPStatus(status) => *status,
2669
2670            // Internal errors - return 502 for upstream issues (more accurate than 500)
2671            ErrorType::InternalError => {
2672                // Check if this is an upstream-related error
2673                let error_str = e.to_string();
2674                if error_str.contains("upstream")
2675                    || error_str.contains("DNS")
2676                    || error_str.contains("resolve")
2677                {
2678                    502
2679                } else {
2680                    500
2681                }
2682            }
2683
2684            // Default to 502 for unknown errors
2685            _ => 502,
2686        };
2687
2688        error!(
2689            correlation_id = %ctx.trace_id,
2690            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2691            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2692            error_type = ?e.etype(),
2693            error = %e,
2694            error_code = error_code,
2695            "Proxy error occurred"
2696        );
2697
2698        // Record the error in metrics
2699        self.metrics
2700            .record_blocked_request(&format!("proxy_error_{}", error_code));
2701
2702        // Write error response to ensure client receives a proper HTTP response
2703        // This is necessary because some errors occur before the upstream connection
2704        // is established, and Pingora may not send a response automatically
2705        let error_message = match error_code {
2706            400 => "Bad Request",
2707            502 => "Bad Gateway",
2708            503 => "Service Unavailable",
2709            504 => "Gateway Timeout",
2710            _ => "Internal Server Error",
2711        };
2712
2713        // Build a minimal error response body
2714        let body = format!(
2715            r#"{{"error":"{} {}","trace_id":"{}"}}"#,
2716            error_code, error_message, ctx.trace_id
2717        );
2718
2719        // Write the response header
2720        let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
2721        header
2722            .insert_header("Content-Type", "application/json")
2723            .ok();
2724        header
2725            .insert_header("Content-Length", body.len().to_string())
2726            .ok();
2727        header
2728            .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
2729            .ok();
2730        header.insert_header("Connection", "close").ok();
2731
2732        // Write headers and body
2733        if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
2734            warn!(
2735                correlation_id = %ctx.trace_id,
2736                error = %write_err,
2737                "Failed to write error response header"
2738            );
2739        } else {
2740            // Write the body
2741            if let Err(write_err) = session
2742                .write_response_body(Some(bytes::Bytes::from(body)), true)
2743                .await
2744            {
2745                warn!(
2746                    correlation_id = %ctx.trace_id,
2747                    error = %write_err,
2748                    "Failed to write error response body"
2749                );
2750            }
2751        }
2752
2753        // Return the error response info
2754        // can_reuse_downstream: false since we already wrote and closed the response
2755        pingora_proxy::FailToProxy {
2756            error_code,
2757            can_reuse_downstream: false,
2758        }
2759    }
2760
2761    /// Handle errors that occur during proxying after upstream connection is established.
2762    ///
2763    /// This method enables retry logic and circuit breaker integration.
2764    /// It's called when an error occurs during the request/response exchange
2765    /// with the upstream server.
2766    fn error_while_proxy(
2767        &self,
2768        peer: &HttpPeer,
2769        session: &mut Session,
2770        e: Box<Error>,
2771        ctx: &mut Self::CTX,
2772        client_reused: bool,
2773    ) -> Box<Error> {
2774        let error_type = e.etype().clone();
2775        let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
2776
2777        // Classify error for retry decisions
2778        let is_retryable = matches!(
2779            error_type,
2780            ErrorType::ConnectTimedout
2781                | ErrorType::ReadTimedout
2782                | ErrorType::WriteTimedout
2783                | ErrorType::ConnectionClosed
2784                | ErrorType::ConnectRefused
2785        );
2786
2787        // Log the error with context
2788        warn!(
2789            correlation_id = %ctx.trace_id,
2790            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2791            upstream = %upstream_id,
2792            peer_address = %peer.address(),
2793            error_type = ?error_type,
2794            error = %e,
2795            client_reused = client_reused,
2796            is_retryable = is_retryable,
2797            "Error during proxy operation"
2798        );
2799
2800        // Record failure with circuit breaker via upstream pool
2801        // This is done asynchronously since we can't await in a sync fn
2802        let peer_address = peer.address().to_string();
2803        let upstream_pools = self.upstream_pools.clone();
2804        let upstream_id_owned = upstream_id.to_string();
2805        tokio::spawn(async move {
2806            if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
2807                pool.report_result(&peer_address, false).await;
2808            }
2809        });
2810
2811        // Metrics tracking
2812        self.metrics
2813            .record_blocked_request(&format!("proxy_error_{:?}", error_type));
2814
2815        // Create enhanced error with retry information
2816        let mut enhanced_error = e.more_context(format!(
2817            "Upstream: {}, Peer: {}, Attempts: {}",
2818            upstream_id,
2819            peer.address(),
2820            ctx.upstream_attempts
2821        ));
2822
2823        // Determine if retry should be attempted:
2824        // - Only retry if error is retryable type
2825        // - Only retry reused connections if buffer isn't truncated
2826        // - Track retry metrics
2827        if is_retryable {
2828            let can_retry = if client_reused {
2829                // For reused connections, check if retry buffer is intact
2830                !session.as_ref().retry_buffer_truncated()
2831            } else {
2832                // Fresh connections can always retry
2833                true
2834            };
2835
2836            enhanced_error.retry.decide_reuse(can_retry);
2837
2838            if can_retry {
2839                debug!(
2840                    correlation_id = %ctx.trace_id,
2841                    upstream = %upstream_id,
2842                    error_type = ?error_type,
2843                    "Error is retryable, will attempt retry"
2844                );
2845            }
2846        } else {
2847            // Non-retryable error - don't retry
2848            enhanced_error.retry.decide_reuse(false);
2849        }
2850
2851        enhanced_error
2852    }
2853
2854    async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
2855        // Decrement active requests
2856        self.reload_coordinator.dec_requests();
2857
2858        // === Fire pending shadow request (if body buffering was enabled) ===
2859        if !ctx.shadow_sent {
2860            if let Some(shadow_pending) = ctx.shadow_pending.take() {
2861                let body = if shadow_pending.include_body && !ctx.body_buffer.is_empty() {
2862                    // Clone the buffered body for the shadow request
2863                    Some(ctx.body_buffer.clone())
2864                } else {
2865                    None
2866                };
2867
2868                trace!(
2869                    correlation_id = %ctx.trace_id,
2870                    body_size = body.as_ref().map(|b| b.len()).unwrap_or(0),
2871                    "Firing deferred shadow request with buffered body"
2872                );
2873
2874                shadow_pending.manager.shadow_request(
2875                    shadow_pending.headers,
2876                    body,
2877                    shadow_pending.request_ctx,
2878                );
2879                ctx.shadow_sent = true;
2880            }
2881        }
2882
2883        let duration = ctx.elapsed();
2884
2885        // Get response status
2886        let status = session
2887            .response_written()
2888            .map(|r| r.status.as_u16())
2889            .unwrap_or(0);
2890
2891        // Report result to load balancer for adaptive LB feedback
2892        // This enables latency-aware weight adjustment
2893        if let (Some(ref peer_addr), Some(ref upstream_id)) =
2894            (&ctx.selected_upstream_address, &ctx.upstream)
2895        {
2896            // Success = status code < 500 (client errors are not upstream failures)
2897            let success = status > 0 && status < 500;
2898
2899            if let Some(pool) = self.upstream_pools.get(upstream_id).await {
2900                pool.report_result_with_latency(peer_addr, success, Some(duration))
2901                    .await;
2902                trace!(
2903                    correlation_id = %ctx.trace_id,
2904                    upstream = %upstream_id,
2905                    peer_address = %peer_addr,
2906                    success = success,
2907                    duration_ms = duration.as_millis(),
2908                    status = status,
2909                    "Reported result to adaptive load balancer"
2910                );
2911            }
2912
2913            // Track warmth for inference routes (cold model detection)
2914            if ctx.inference_rate_limit_enabled && success {
2915                let cold_detected = self.warmth_tracker.record_request(peer_addr, duration);
2916                if cold_detected {
2917                    debug!(
2918                        correlation_id = %ctx.trace_id,
2919                        upstream = %upstream_id,
2920                        peer_address = %peer_addr,
2921                        duration_ms = duration.as_millis(),
2922                        "Cold model detected on inference upstream"
2923                    );
2924                }
2925            }
2926        }
2927
2928        // Record actual token usage for inference rate limiting
2929        // This adjusts the token bucket based on actual vs estimated tokens
2930        if ctx.inference_rate_limit_enabled {
2931            if let (Some(route_id), Some(ref rate_limit_key)) =
2932                (ctx.route_id.as_deref(), &ctx.inference_rate_limit_key)
2933            {
2934                // Try to extract actual tokens from response headers
2935                let response_headers = session
2936                    .response_written()
2937                    .map(|r| &r.headers)
2938                    .cloned()
2939                    .unwrap_or_default();
2940
2941                // For streaming responses, finalize the streaming token counter
2942                let streaming_result = if ctx.inference_streaming_response {
2943                    ctx.inference_streaming_counter
2944                        .as_ref()
2945                        .map(|counter| counter.finalize())
2946                } else {
2947                    None
2948                };
2949
2950                // Log streaming token count info
2951                if let Some(ref result) = streaming_result {
2952                    debug!(
2953                        correlation_id = %ctx.trace_id,
2954                        output_tokens = result.output_tokens,
2955                        input_tokens = ?result.input_tokens,
2956                        source = ?result.source,
2957                        content_length = result.content_length,
2958                        "Finalized streaming token count"
2959                    );
2960                }
2961
2962                // PII detection guardrail (for streaming inference responses)
2963                if ctx.inference_streaming_response {
2964                    if let Some(ref route_config) = ctx.route_config {
2965                        if let Some(ref inference) = route_config.inference {
2966                            if let Some(ref guardrails) = inference.guardrails {
2967                                if let Some(ref pii_config) = guardrails.pii_detection {
2968                                    if pii_config.enabled {
2969                                        // Get accumulated content from streaming counter
2970                                        if let Some(ref counter) = ctx.inference_streaming_counter {
2971                                            let response_content = counter.content();
2972                                            if !response_content.is_empty() {
2973                                                let pii_result = self
2974                                                    .guardrail_processor
2975                                                    .check_pii(
2976                                                        pii_config,
2977                                                        response_content,
2978                                                        ctx.route_id.as_deref(),
2979                                                        &ctx.trace_id,
2980                                                    )
2981                                                    .await;
2982
2983                                                match pii_result {
2984                                                    crate::inference::PiiCheckResult::Detected {
2985                                                        detections,
2986                                                        redacted_content: _,
2987                                                    } => {
2988                                                        warn!(
2989                                                            correlation_id = %ctx.trace_id,
2990                                                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2991                                                            detection_count = detections.len(),
2992                                                            "PII detected in inference response"
2993                                                        );
2994
2995                                                        // Store detection categories for logging
2996                                                        ctx.pii_detection_categories = detections
2997                                                            .iter()
2998                                                            .map(|d| d.category.clone())
2999                                                            .collect();
3000
3001                                                        // Record metrics for each category
3002                                                        for detection in &detections {
3003                                                            self.metrics.record_pii_detected(
3004                                                                ctx.route_id.as_deref().unwrap_or("unknown"),
3005                                                                &detection.category,
3006                                                            );
3007                                                        }
3008                                                    }
3009                                                    crate::inference::PiiCheckResult::Clean => {
3010                                                        trace!(
3011                                                            correlation_id = %ctx.trace_id,
3012                                                            "No PII detected in response"
3013                                                        );
3014                                                    }
3015                                                    crate::inference::PiiCheckResult::Error { message } => {
3016                                                        debug!(
3017                                                            correlation_id = %ctx.trace_id,
3018                                                            error = %message,
3019                                                            "PII detection check failed"
3020                                                        );
3021                                                    }
3022                                                }
3023                                            }
3024                                        }
3025                                    }
3026                                }
3027                            }
3028                        }
3029                    }
3030                }
3031
3032                // Response body would require buffering, which is expensive
3033                // For non-streaming, most LLM APIs provide token counts in headers
3034                // For streaming, we use the accumulated SSE content
3035                let empty_body: &[u8] = &[];
3036
3037                if let Some(actual_estimate) = self.inference_rate_limit_manager.record_actual(
3038                    route_id,
3039                    rate_limit_key,
3040                    &response_headers,
3041                    empty_body,
3042                    ctx.inference_estimated_tokens,
3043                ) {
3044                    // Use streaming result if available and header extraction failed
3045                    let (actual_tokens, source_info) = if let Some(ref streaming) = streaming_result {
3046                        // Prefer API-provided counts from streaming, otherwise use tiktoken count
3047                        if streaming.total_tokens.is_some() {
3048                            (streaming.total_tokens.unwrap(), "streaming_api")
3049                        } else if actual_estimate.source == crate::inference::TokenSource::Estimated {
3050                            // Header extraction failed, use streaming tiktoken count
3051                            // Estimate total by adding input estimate + output from streaming
3052                            let total = ctx.inference_input_tokens + streaming.output_tokens;
3053                            (total, "streaming_tiktoken")
3054                        } else {
3055                            (actual_estimate.tokens, "headers")
3056                        }
3057                    } else {
3058                        (actual_estimate.tokens, "headers")
3059                    };
3060
3061                    ctx.inference_actual_tokens = Some(actual_tokens);
3062
3063                    debug!(
3064                        correlation_id = %ctx.trace_id,
3065                        route_id = route_id,
3066                        estimated_tokens = ctx.inference_estimated_tokens,
3067                        actual_tokens = actual_tokens,
3068                        source = source_info,
3069                        streaming_response = ctx.inference_streaming_response,
3070                        model = ?ctx.inference_model,
3071                        "Recorded actual inference tokens"
3072                    );
3073
3074                    // Record budget usage with actual tokens (if budget tracking enabled)
3075                    if ctx.inference_budget_enabled {
3076                        let alerts = self.inference_rate_limit_manager.record_budget(
3077                            route_id,
3078                            rate_limit_key,
3079                            actual_tokens,
3080                        );
3081
3082                        // Log any budget alerts that fired
3083                        for alert in alerts.iter() {
3084                            warn!(
3085                                correlation_id = %ctx.trace_id,
3086                                route_id = route_id,
3087                                tenant = %alert.tenant,
3088                                threshold_pct = alert.threshold * 100.0,
3089                                tokens_used = alert.tokens_used,
3090                                tokens_limit = alert.tokens_limit,
3091                                "Token budget alert threshold crossed"
3092                            );
3093                        }
3094
3095                        // Update context with remaining budget
3096                        if let Some(status) = self.inference_rate_limit_manager.budget_status(
3097                            route_id,
3098                            rate_limit_key,
3099                        ) {
3100                            ctx.inference_budget_remaining = Some(status.tokens_remaining as i64);
3101                        }
3102                    }
3103
3104                    // Calculate cost if cost attribution is enabled
3105                    if ctx.inference_cost_enabled {
3106                        if let Some(model) = ctx.inference_model.as_deref() {
3107                            // Use streaming result for more accurate input/output split if available
3108                            let (input_tokens, output_tokens) = if let Some(ref streaming) = streaming_result {
3109                                // Streaming gives us accurate output tokens
3110                                let input = streaming.input_tokens.unwrap_or(ctx.inference_input_tokens);
3111                                let output = streaming.output_tokens;
3112                                (input, output)
3113                            } else {
3114                                // Fallback: estimate output from total - input
3115                                let input = ctx.inference_input_tokens;
3116                                let output = actual_tokens.saturating_sub(input);
3117                                (input, output)
3118                            };
3119                            ctx.inference_output_tokens = output_tokens;
3120
3121                            if let Some(cost_result) = self.inference_rate_limit_manager.calculate_cost(
3122                                route_id,
3123                                model,
3124                                input_tokens,
3125                                output_tokens,
3126                            ) {
3127                                ctx.inference_request_cost = Some(cost_result.total_cost);
3128
3129                                trace!(
3130                                    correlation_id = %ctx.trace_id,
3131                                    route_id = route_id,
3132                                    model = model,
3133                                    input_tokens = input_tokens,
3134                                    output_tokens = output_tokens,
3135                                    total_cost = cost_result.total_cost,
3136                                    currency = %cost_result.currency,
3137                                    "Calculated inference request cost"
3138                                );
3139                            }
3140                        }
3141                    }
3142                }
3143            }
3144        }
3145
3146        // Write to access log file if configured (check sampling before allocating entry)
3147        if self.log_manager.should_log_access(status) {
3148            let access_entry = AccessLogEntry {
3149                timestamp: chrono::Utc::now().to_rfc3339(),
3150                trace_id: ctx.trace_id.clone(),
3151                method: ctx.method.clone(),
3152                path: ctx.path.clone(),
3153                query: ctx.query.clone(),
3154                protocol: "HTTP/1.1".to_string(),
3155                status,
3156                body_bytes: ctx.response_bytes,
3157                duration_ms: duration.as_millis() as u64,
3158                client_ip: ctx.client_ip.clone(),
3159                user_agent: ctx.user_agent.clone(),
3160                referer: ctx.referer.clone(),
3161                host: ctx.host.clone(),
3162                route_id: ctx.route_id.clone(),
3163                upstream: ctx.upstream.clone(),
3164                upstream_attempts: ctx.upstream_attempts,
3165                instance_id: self.app_state.instance_id.clone(),
3166                namespace: ctx.namespace.clone(),
3167                service: ctx.service.clone(),
3168                // New fields
3169                body_bytes_sent: ctx.response_bytes,
3170                upstream_addr: ctx.selected_upstream_address.clone(),
3171                connection_reused: ctx.connection_reused,
3172                rate_limit_hit: status == 429,
3173                geo_country: ctx.geo_country_code.clone(),
3174            };
3175            self.log_manager.log_access(&access_entry);
3176        }
3177
3178        // Log to tracing at debug level (avoid allocations if debug disabled)
3179        if tracing::enabled!(tracing::Level::DEBUG) {
3180            debug!(
3181                trace_id = %ctx.trace_id,
3182                method = %ctx.method,
3183                path = %ctx.path,
3184                route_id = ?ctx.route_id,
3185                upstream = ?ctx.upstream,
3186                status = status,
3187                duration_ms = duration.as_millis() as u64,
3188                upstream_attempts = ctx.upstream_attempts,
3189                error = ?_error.map(|e| e.to_string()),
3190                "Request completed"
3191            );
3192        }
3193
3194        // Log WebSocket upgrades at info level
3195        if ctx.is_websocket_upgrade && status == 101 {
3196            info!(
3197                trace_id = %ctx.trace_id,
3198                route_id = ?ctx.route_id,
3199                upstream = ?ctx.upstream,
3200                client_ip = %ctx.client_ip,
3201                "WebSocket connection established"
3202            );
3203        }
3204
3205        // End OpenTelemetry span
3206        if let Some(span) = ctx.otel_span.take() {
3207            span.end();
3208        }
3209    }
3210}
3211
3212// =============================================================================
3213// Helper methods for body streaming (not part of ProxyHttp trait)
3214// =============================================================================
3215
3216impl SentinelProxy {
3217    /// Process a single body chunk in streaming mode.
3218    async fn process_body_chunk_streaming(
3219        &self,
3220        body: &mut Option<Bytes>,
3221        end_of_stream: bool,
3222        ctx: &mut RequestContext,
3223    ) -> Result<(), Box<Error>> {
3224        // Clone the chunk data to avoid borrowing issues when mutating body later
3225        let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
3226        let chunk_index = ctx.request_body_chunk_index;
3227        ctx.request_body_chunk_index += 1;
3228        ctx.body_bytes_inspected += chunk_data.len() as u64;
3229
3230        debug!(
3231            correlation_id = %ctx.trace_id,
3232            chunk_index = chunk_index,
3233            chunk_size = chunk_data.len(),
3234            end_of_stream = end_of_stream,
3235            "Streaming body chunk to agents"
3236        );
3237
3238        // Create agent call context
3239        let agent_ctx = crate::agents::AgentCallContext {
3240            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3241            metadata: sentinel_agent_protocol::RequestMetadata {
3242                correlation_id: ctx.trace_id.clone(),
3243                request_id: ctx.trace_id.clone(),
3244                client_ip: ctx.client_ip.clone(),
3245                client_port: 0,
3246                server_name: ctx.host.clone(),
3247                protocol: "HTTP/1.1".to_string(),
3248                tls_version: None,
3249                tls_cipher: None,
3250                route_id: ctx.route_id.clone(),
3251                upstream_id: ctx.upstream.clone(),
3252                timestamp: chrono::Utc::now().to_rfc3339(),
3253                traceparent: ctx.traceparent(),
3254            },
3255            route_id: ctx.route_id.clone(),
3256            upstream_id: ctx.upstream.clone(),
3257            request_body: None, // Not used in streaming mode
3258            response_body: None,
3259        };
3260
3261        let agent_ids = ctx.body_inspection_agents.clone();
3262        let total_size = None; // Unknown in streaming mode
3263
3264        match self
3265            .agent_manager
3266            .process_request_body_streaming(
3267                &agent_ctx,
3268                &chunk_data,
3269                end_of_stream,
3270                chunk_index,
3271                ctx.body_bytes_inspected as usize,
3272                total_size,
3273                &agent_ids,
3274            )
3275            .await
3276        {
3277            Ok(decision) => {
3278                // Track if agent needs more data
3279                ctx.agent_needs_more = decision.needs_more;
3280
3281                // Apply body mutation if present
3282                if let Some(ref mutation) = decision.request_body_mutation {
3283                    if !mutation.is_pass_through() {
3284                        if mutation.is_drop() {
3285                            // Drop the chunk
3286                            *body = None;
3287                            trace!(
3288                                correlation_id = %ctx.trace_id,
3289                                chunk_index = chunk_index,
3290                                "Agent dropped body chunk"
3291                            );
3292                        } else if let Some(ref new_data) = mutation.data {
3293                            // Replace chunk with mutated content
3294                            *body = Some(Bytes::from(new_data.clone()));
3295                            trace!(
3296                                correlation_id = %ctx.trace_id,
3297                                chunk_index = chunk_index,
3298                                original_size = chunk_data.len(),
3299                                new_size = new_data.len(),
3300                                "Agent mutated body chunk"
3301                            );
3302                        }
3303                    }
3304                }
3305
3306                // Check decision (only final if needs_more is false)
3307                if !decision.needs_more && !decision.is_allow() {
3308                    warn!(
3309                        correlation_id = %ctx.trace_id,
3310                        action = ?decision.action,
3311                        "Agent blocked request body"
3312                    );
3313                    self.metrics.record_blocked_request("agent_body_inspection");
3314
3315                    let (status, message) = match &decision.action {
3316                        crate::agents::AgentAction::Block { status, body, .. } => (
3317                            *status,
3318                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
3319                        ),
3320                        _ => (403, "Forbidden".to_string()),
3321                    };
3322
3323                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3324                }
3325
3326                trace!(
3327                    correlation_id = %ctx.trace_id,
3328                    needs_more = decision.needs_more,
3329                    "Agent processed body chunk"
3330                );
3331            }
3332            Err(e) => {
3333                let fail_closed = ctx
3334                    .route_config
3335                    .as_ref()
3336                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3337                    .unwrap_or(false);
3338
3339                if fail_closed {
3340                    error!(
3341                        correlation_id = %ctx.trace_id,
3342                        error = %e,
3343                        "Agent streaming body inspection failed, blocking (fail-closed)"
3344                    );
3345                    return Err(Error::explain(
3346                        ErrorType::HTTPStatus(503),
3347                        "Service unavailable",
3348                    ));
3349                } else {
3350                    warn!(
3351                        correlation_id = %ctx.trace_id,
3352                        error = %e,
3353                        "Agent streaming body inspection failed, allowing (fail-open)"
3354                    );
3355                }
3356            }
3357        }
3358
3359        Ok(())
3360    }
3361
3362    /// Send buffered body to agents (buffer mode).
3363    async fn send_buffered_body_to_agents(
3364        &self,
3365        end_of_stream: bool,
3366        ctx: &mut RequestContext,
3367    ) -> Result<(), Box<Error>> {
3368        debug!(
3369            correlation_id = %ctx.trace_id,
3370            buffer_size = ctx.body_buffer.len(),
3371            end_of_stream = end_of_stream,
3372            agent_count = ctx.body_inspection_agents.len(),
3373            decompression_enabled = ctx.decompression_enabled,
3374            "Sending buffered body to agents for inspection"
3375        );
3376
3377        // Decompress body if enabled and we have a supported encoding
3378        let body_for_inspection = if ctx.decompression_enabled {
3379            if let Some(ref encoding) = ctx.body_content_encoding {
3380                let config = crate::decompression::DecompressionConfig {
3381                    max_ratio: ctx.max_decompression_ratio,
3382                    max_output_bytes: ctx.max_decompression_bytes,
3383                };
3384
3385                match crate::decompression::decompress_body(
3386                    &ctx.body_buffer,
3387                    encoding,
3388                    &config,
3389                ) {
3390                    Ok(result) => {
3391                        ctx.body_was_decompressed = true;
3392                        self.metrics
3393                            .record_decompression_success(encoding, result.ratio);
3394                        debug!(
3395                            correlation_id = %ctx.trace_id,
3396                            encoding = %encoding,
3397                            compressed_size = result.compressed_size,
3398                            decompressed_size = result.decompressed_size,
3399                            ratio = result.ratio,
3400                            "Body decompressed for agent inspection"
3401                        );
3402                        result.data
3403                    }
3404                    Err(e) => {
3405                        // Record failure metric
3406                        let failure_reason = match &e {
3407                            crate::decompression::DecompressionError::RatioExceeded { .. } => {
3408                                "ratio_exceeded"
3409                            }
3410                            crate::decompression::DecompressionError::SizeExceeded { .. } => {
3411                                "size_exceeded"
3412                            }
3413                            crate::decompression::DecompressionError::InvalidData { .. } => {
3414                                "invalid_data"
3415                            }
3416                            crate::decompression::DecompressionError::UnsupportedEncoding { .. } => {
3417                                "unsupported"
3418                            }
3419                            crate::decompression::DecompressionError::IoError(_) => "io_error",
3420                        };
3421                        self.metrics
3422                            .record_decompression_failure(encoding, failure_reason);
3423
3424                        // Decompression failed - decide based on failure mode
3425                        let fail_closed = ctx
3426                            .route_config
3427                            .as_ref()
3428                            .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3429                            .unwrap_or(false);
3430
3431                        if fail_closed {
3432                            error!(
3433                                correlation_id = %ctx.trace_id,
3434                                error = %e,
3435                                encoding = %encoding,
3436                                "Decompression failed, blocking (fail-closed)"
3437                            );
3438                            return Err(Error::explain(
3439                                ErrorType::HTTPStatus(400),
3440                                "Invalid compressed body",
3441                            ));
3442                        } else {
3443                            warn!(
3444                                correlation_id = %ctx.trace_id,
3445                                error = %e,
3446                                encoding = %encoding,
3447                                "Decompression failed, sending compressed body (fail-open)"
3448                            );
3449                            ctx.body_buffer.clone()
3450                        }
3451                    }
3452                }
3453            } else {
3454                ctx.body_buffer.clone()
3455            }
3456        } else {
3457            ctx.body_buffer.clone()
3458        };
3459
3460        let agent_ctx = crate::agents::AgentCallContext {
3461            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3462            metadata: sentinel_agent_protocol::RequestMetadata {
3463                correlation_id: ctx.trace_id.clone(),
3464                request_id: ctx.trace_id.clone(),
3465                client_ip: ctx.client_ip.clone(),
3466                client_port: 0,
3467                server_name: ctx.host.clone(),
3468                protocol: "HTTP/1.1".to_string(),
3469                tls_version: None,
3470                tls_cipher: None,
3471                route_id: ctx.route_id.clone(),
3472                upstream_id: ctx.upstream.clone(),
3473                timestamp: chrono::Utc::now().to_rfc3339(),
3474                traceparent: ctx.traceparent(),
3475            },
3476            route_id: ctx.route_id.clone(),
3477            upstream_id: ctx.upstream.clone(),
3478            request_body: Some(body_for_inspection.clone()),
3479            response_body: None,
3480        };
3481
3482        let agent_ids = ctx.body_inspection_agents.clone();
3483        match self
3484            .agent_manager
3485            .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
3486            .await
3487        {
3488            Ok(decision) => {
3489                if !decision.is_allow() {
3490                    warn!(
3491                        correlation_id = %ctx.trace_id,
3492                        action = ?decision.action,
3493                        "Agent blocked request body"
3494                    );
3495                    self.metrics.record_blocked_request("agent_body_inspection");
3496
3497                    let (status, message) = match &decision.action {
3498                        crate::agents::AgentAction::Block { status, body, .. } => (
3499                            *status,
3500                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
3501                        ),
3502                        _ => (403, "Forbidden".to_string()),
3503                    };
3504
3505                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3506                }
3507
3508                trace!(
3509                    correlation_id = %ctx.trace_id,
3510                    "Agent allowed request body"
3511                );
3512            }
3513            Err(e) => {
3514                let fail_closed = ctx
3515                    .route_config
3516                    .as_ref()
3517                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3518                    .unwrap_or(false);
3519
3520                if fail_closed {
3521                    error!(
3522                        correlation_id = %ctx.trace_id,
3523                        error = %e,
3524                        "Agent body inspection failed, blocking (fail-closed)"
3525                    );
3526                    return Err(Error::explain(
3527                        ErrorType::HTTPStatus(503),
3528                        "Service unavailable",
3529                    ));
3530                } else {
3531                    warn!(
3532                        correlation_id = %ctx.trace_id,
3533                        error = %e,
3534                        "Agent body inspection failed, allowing (fail-open)"
3535                    );
3536                }
3537            }
3538        }
3539
3540        Ok(())
3541    }
3542}