sentinel_proxy/proxy/
http_trait.rs

1//! ProxyHttp trait implementation for SentinelProxy.
2//!
3//! This module contains the Pingora ProxyHttp trait implementation which defines
4//! the core request/response lifecycle handling.
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14    CacheKey, CacheMeta, ForcedInvalidationKind, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::inference::{
23    extract_inference_content, is_sse_response, PromptInjectionResult, StreamingTokenCounter,
24};
25use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
26use crate::rate_limit::HeaderAccessor;
27use crate::routing::RequestInfo;
28
29use super::context::{FallbackReason, RequestContext};
30use super::fallback::FallbackEvaluator;
31use super::fallback_metrics::get_fallback_metrics;
32use super::model_routing;
33use super::model_routing_metrics::get_model_routing_metrics;
34use super::SentinelProxy;
35
36/// Helper type for rate limiting when we don't need header access
37struct NoHeaderAccessor;
38impl HeaderAccessor for NoHeaderAccessor {
39    fn get_header(&self, _name: &str) -> Option<String> {
40        None
41    }
42}
43
44#[async_trait]
45impl ProxyHttp for SentinelProxy {
46    type CTX = RequestContext;
47
48    fn new_ctx(&self) -> Self::CTX {
49        RequestContext::new()
50    }
51
52    fn fail_to_connect(
53        &self,
54        _session: &mut Session,
55        peer: &HttpPeer,
56        ctx: &mut Self::CTX,
57        e: Box<Error>,
58    ) -> Box<Error> {
59        error!(
60            correlation_id = %ctx.trace_id,
61            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
62            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
63            peer_address = %peer.address(),
64            error = %e,
65            "Failed to connect to upstream peer"
66        );
67        // Custom error pages are handled in response_filter
68        e
69    }
70
71    /// Early request filter - runs before upstream selection
72    /// Used to handle builtin routes that don't need an upstream connection
73    async fn early_request_filter(
74        &self,
75        session: &mut Session,
76        ctx: &mut Self::CTX,
77    ) -> Result<(), Box<Error>> {
78        // Extract request info for routing
79        let req_header = session.req_header();
80        let method = req_header.method.as_str();
81        let path = req_header.uri.path();
82        let host = req_header
83            .headers
84            .get("host")
85            .and_then(|h| h.to_str().ok())
86            .unwrap_or("");
87
88        // Handle ACME HTTP-01 challenges before any other processing
89        if let Some(ref challenge_manager) = self.acme_challenges {
90            if let Some(token) = crate::acme::ChallengeManager::extract_token(path) {
91                if let Some(key_authorization) = challenge_manager.get_response(token) {
92                    debug!(
93                        token = %token,
94                        "Serving ACME HTTP-01 challenge response"
95                    );
96
97                    // Build response
98                    let mut resp = ResponseHeader::build(200, None)?;
99                    resp.insert_header("Content-Type", "text/plain")?;
100                    resp.insert_header("Content-Length", key_authorization.len().to_string())?;
101
102                    // Send response
103                    session
104                        .write_response_header(Box::new(resp), false)
105                        .await?;
106                    session
107                        .write_response_body(Some(Bytes::from(key_authorization)), true)
108                        .await?;
109
110                    // Return error to signal request is complete
111                    return Err(Error::explain(
112                        ErrorType::InternalError,
113                        "ACME challenge served",
114                    ));
115                } else {
116                    // Token not found - could be a stale request or attack
117                    warn!(
118                        token = %token,
119                        "ACME challenge token not found"
120                    );
121                }
122            }
123        }
124
125        ctx.method = method.to_string();
126        ctx.path = path.to_string();
127        ctx.host = Some(host.to_string());
128
129        // Match route to determine service type
130        let route_match = {
131            let route_matcher = self.route_matcher.read();
132            let request_info = RequestInfo::new(method, path, host);
133            match route_matcher.match_request(&request_info) {
134                Some(m) => m,
135                None => return Ok(()), // No matching route, let upstream_peer handle it
136            }
137        };
138
139        ctx.trace_id = self.get_trace_id(session);
140        ctx.route_id = Some(route_match.route_id.to_string());
141        ctx.route_config = Some(route_match.config.clone());
142
143        // Parse incoming W3C trace context if present
144        if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
145            if let Ok(s) = traceparent.to_str() {
146                ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
147            }
148        }
149
150        // Start OpenTelemetry request span if tracing is enabled
151        if let Some(tracer) = crate::otel::get_tracer() {
152            ctx.otel_span = Some(tracer.start_span(method, path, ctx.trace_context.as_ref()));
153        }
154
155        // Check if this is a builtin handler route
156        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
157            trace!(
158                correlation_id = %ctx.trace_id,
159                route_id = %route_match.route_id,
160                builtin_handler = ?route_match.config.builtin_handler,
161                "Handling builtin route in early_request_filter"
162            );
163
164            // Handle the builtin route directly
165            let handled = self
166                .handle_builtin_route(session, ctx, &route_match)
167                .await?;
168
169            if handled {
170                // Return error to signal that request is complete (Pingora will not continue)
171                return Err(Error::explain(
172                    ErrorType::InternalError,
173                    "Builtin handler complete",
174                ));
175            }
176        }
177
178        Ok(())
179    }
180
181    async fn upstream_peer(
182        &self,
183        session: &mut Session,
184        ctx: &mut Self::CTX,
185    ) -> Result<Box<HttpPeer>, Box<Error>> {
186        // Track active request
187        self.reload_coordinator.inc_requests();
188
189        // Cache global config once per request (avoids repeated Arc clones)
190        if ctx.config.is_none() {
191            ctx.config = Some(self.config_manager.current());
192        }
193
194        // Cache client address for logging if not already set
195        if ctx.client_ip.is_empty() {
196            ctx.client_ip = session
197                .client_addr()
198                .map(|a| a.to_string())
199                .unwrap_or_else(|| "unknown".to_string());
200        }
201
202        let req_header = session.req_header();
203
204        // Cache request info for access logging if not already set
205        if ctx.method.is_empty() {
206            ctx.method = req_header.method.to_string();
207            ctx.path = req_header.uri.path().to_string();
208            ctx.query = req_header.uri.query().map(|q| q.to_string());
209            ctx.host = req_header
210                .headers
211                .get("host")
212                .and_then(|v| v.to_str().ok())
213                .map(|s| s.to_string());
214        }
215        ctx.user_agent = req_header
216            .headers
217            .get("user-agent")
218            .and_then(|v| v.to_str().ok())
219            .map(|s| s.to_string());
220        ctx.referer = req_header
221            .headers
222            .get("referer")
223            .and_then(|v| v.to_str().ok())
224            .map(|s| s.to_string());
225
226        trace!(
227            correlation_id = %ctx.trace_id,
228            client_ip = %ctx.client_ip,
229            "Request received, initializing context"
230        );
231
232        // Use cached route info if already set by early_request_filter
233        let route_match = if let Some(ref route_config) = ctx.route_config {
234            let route_id = ctx.route_id.as_deref().unwrap_or("");
235            crate::routing::RouteMatch {
236                route_id: sentinel_common::RouteId::new(route_id),
237                config: route_config.clone(),
238            }
239        } else {
240            // Match route using sync RwLock (scoped to ensure lock is released before async ops)
241            let (match_result, route_duration) = {
242                let route_matcher = self.route_matcher.read();
243                let host = ctx.host.as_deref().unwrap_or("");
244
245                // Build request info (zero-copy for common case)
246                let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
247
248                // Only build headers HashMap if any route needs header matching
249                if route_matcher.needs_headers() {
250                    request_info = request_info
251                        .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
252                }
253
254                // Only parse query params if any route needs query param matching
255                if route_matcher.needs_query_params() {
256                    request_info =
257                        request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
258                }
259
260                trace!(
261                    correlation_id = %ctx.trace_id,
262                    method = %request_info.method,
263                    path = %request_info.path,
264                    host = %request_info.host,
265                    "Built request info for route matching"
266                );
267
268                let route_start = std::time::Instant::now();
269                let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
270                    warn!(
271                        correlation_id = %ctx.trace_id,
272                        method = %request_info.method,
273                        path = %request_info.path,
274                        host = %request_info.host,
275                        "No matching route found for request"
276                    );
277                    Error::explain(ErrorType::InternalError, "No matching route found")
278                })?;
279                let route_duration = route_start.elapsed();
280                // Lock is dropped here when block ends
281                (route_match, route_duration)
282            };
283
284            ctx.route_id = Some(match_result.route_id.to_string());
285            ctx.route_config = Some(match_result.config.clone());
286
287            // Set trace_id if not already set by early_request_filter
288            if ctx.trace_id.is_empty() {
289                ctx.trace_id = self.get_trace_id(session);
290
291                // Parse incoming W3C trace context if present
292                if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER)
293                {
294                    if let Ok(s) = traceparent.to_str() {
295                        ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
296                    }
297                }
298
299                // Start OpenTelemetry request span if tracing is enabled
300                if let Some(tracer) = crate::otel::get_tracer() {
301                    ctx.otel_span = Some(tracer.start_span(
302                        &ctx.method,
303                        &ctx.path,
304                        ctx.trace_context.as_ref(),
305                    ));
306                }
307            }
308
309            trace!(
310                correlation_id = %ctx.trace_id,
311                route_id = %match_result.route_id,
312                route_duration_us = route_duration.as_micros(),
313                service_type = ?match_result.config.service_type,
314                "Route matched"
315            );
316            match_result
317        };
318
319        // Check if this is a builtin handler route (no upstream needed)
320        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
321            trace!(
322                correlation_id = %ctx.trace_id,
323                route_id = %route_match.route_id,
324                builtin_handler = ?route_match.config.builtin_handler,
325                "Route type is builtin, skipping upstream"
326            );
327            // Mark as builtin route for later processing in request_filter
328            ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
329            // Return error to skip upstream connection for builtin routes
330            return Err(Error::explain(
331                ErrorType::InternalError,
332                "Builtin handler handled in request_filter",
333            ));
334        }
335
336        // Check if this is a static file route
337        if route_match.config.service_type == sentinel_config::ServiceType::Static {
338            trace!(
339                correlation_id = %ctx.trace_id,
340                route_id = %route_match.route_id,
341                "Route type is static, checking for static server"
342            );
343            // Static routes don't need an upstream
344            if self
345                .static_servers
346                .get(route_match.route_id.as_str())
347                .await
348                .is_some()
349            {
350                // Mark this as a static route for later processing
351                ctx.upstream = Some(format!("_static_{}", route_match.route_id));
352                info!(
353                    correlation_id = %ctx.trace_id,
354                    route_id = %route_match.route_id,
355                    path = %ctx.path,
356                    "Serving static file"
357                );
358                // Return error to avoid upstream connection for static routes
359                return Err(Error::explain(
360                    ErrorType::InternalError,
361                    "Static file serving handled in request_filter",
362                ));
363            }
364        }
365
366        // === Model-based routing (for inference routes) ===
367        // Check if model routing is configured and select upstream based on model
368        let mut model_routing_applied = false;
369        if let Some(ref inference) = route_match.config.inference {
370            if let Some(ref model_routing) = inference.model_routing {
371                // Try to extract model from headers (fast path - no body parsing needed)
372                let model = model_routing::extract_model_from_headers(&req_header.headers);
373
374                if let Some(ref model_name) = model {
375                    // Find upstream for this model
376                    if let Some(routing_result) =
377                        model_routing::find_upstream_for_model(model_routing, model_name)
378                    {
379                        debug!(
380                            correlation_id = %ctx.trace_id,
381                            route_id = %route_match.route_id,
382                            model = %model_name,
383                            upstream = %routing_result.upstream,
384                            is_default = routing_result.is_default,
385                            provider_override = ?routing_result.provider,
386                            "Model-based routing selected upstream"
387                        );
388
389                        ctx.record_model_routing(
390                            &routing_result.upstream,
391                            Some(model_name.clone()),
392                            routing_result.provider,
393                        );
394                        model_routing_applied = true;
395
396                        // Record metrics
397                        if let Some(metrics) = get_model_routing_metrics() {
398                            metrics.record_model_routed(
399                                route_match.route_id.as_str(),
400                                model_name,
401                                &routing_result.upstream,
402                            );
403                            if routing_result.is_default {
404                                metrics.record_default_upstream(route_match.route_id.as_str());
405                            }
406                            if let Some(provider) = routing_result.provider {
407                                metrics.record_provider_override(
408                                    route_match.route_id.as_str(),
409                                    &routing_result.upstream,
410                                    provider.as_str(),
411                                );
412                            }
413                        }
414                    }
415                } else if let Some(ref default_upstream) = model_routing.default_upstream {
416                    // No model in headers, use default upstream
417                    debug!(
418                        correlation_id = %ctx.trace_id,
419                        route_id = %route_match.route_id,
420                        upstream = %default_upstream,
421                        "Model-based routing using default upstream (no model header)"
422                    );
423                    ctx.record_model_routing(default_upstream, None, None);
424                    model_routing_applied = true;
425
426                    // Record metrics for no model header case
427                    if let Some(metrics) = get_model_routing_metrics() {
428                        metrics.record_no_model_header(route_match.route_id.as_str());
429                    }
430                }
431            }
432        }
433
434        // Regular route with upstream (if model routing didn't set it)
435        if !model_routing_applied {
436            if let Some(ref upstream) = route_match.config.upstream {
437                ctx.upstream = Some(upstream.clone());
438                trace!(
439                    correlation_id = %ctx.trace_id,
440                    route_id = %route_match.route_id,
441                    upstream = %upstream,
442                    "Upstream configured for route"
443                );
444            } else {
445                error!(
446                    correlation_id = %ctx.trace_id,
447                    route_id = %route_match.route_id,
448                    "Route has no upstream configured"
449                );
450                return Err(Error::explain(
451                    ErrorType::InternalError,
452                    format!(
453                        "Route '{}' has no upstream configured",
454                        route_match.route_id
455                    ),
456                ));
457            }
458        }
459
460        // === Fallback routing evaluation (pre-request) ===
461        // Check if fallback should be triggered due to health or budget conditions
462        if let Some(ref fallback_config) = route_match.config.fallback {
463            let upstream_name = ctx.upstream.as_ref().unwrap();
464
465            // Check if primary upstream is healthy
466            let is_healthy = if let Some(pool) = self.upstream_pools.get(upstream_name).await {
467                pool.has_healthy_targets().await
468            } else {
469                false // Pool not found, treat as unhealthy
470            };
471
472            // Check if budget is exhausted (for inference routes)
473            let is_budget_exhausted = ctx.inference_budget_exhausted;
474
475            // Get model name for model mapping (inference routes)
476            let current_model = ctx.inference_model.as_deref();
477
478            // Create fallback evaluator
479            let evaluator = FallbackEvaluator::new(
480                fallback_config,
481                ctx.tried_upstreams(),
482                ctx.fallback_attempt,
483            );
484
485            // Evaluate pre-request fallback conditions
486            if let Some(decision) = evaluator.should_fallback_before_request(
487                upstream_name,
488                is_healthy,
489                is_budget_exhausted,
490                current_model,
491            ) {
492                info!(
493                    correlation_id = %ctx.trace_id,
494                    route_id = %route_match.route_id,
495                    from_upstream = %upstream_name,
496                    to_upstream = %decision.next_upstream,
497                    reason = %decision.reason,
498                    fallback_attempt = ctx.fallback_attempt + 1,
499                    "Triggering fallback routing"
500                );
501
502                // Record fallback metrics
503                if let Some(metrics) = get_fallback_metrics() {
504                    metrics.record_fallback_attempt(
505                        route_match.route_id.as_str(),
506                        upstream_name,
507                        &decision.next_upstream,
508                        &decision.reason,
509                    );
510                }
511
512                // Record fallback in context
513                ctx.record_fallback(decision.reason, &decision.next_upstream);
514
515                // Apply model mapping if present
516                if let Some((original, mapped)) = decision.model_mapping {
517                    // Record model mapping metrics
518                    if let Some(metrics) = get_fallback_metrics() {
519                        metrics.record_model_mapping(
520                            route_match.route_id.as_str(),
521                            &original,
522                            &mapped,
523                        );
524                    }
525
526                    ctx.record_model_mapping(original, mapped);
527                    trace!(
528                        correlation_id = %ctx.trace_id,
529                        original_model = ?ctx.model_mapping_applied().map(|m| &m.0),
530                        mapped_model = ?ctx.model_mapping_applied().map(|m| &m.1),
531                        "Applied model mapping for fallback"
532                    );
533                }
534            }
535        }
536
537        debug!(
538            correlation_id = %ctx.trace_id,
539            route_id = %route_match.route_id,
540            upstream = ?ctx.upstream,
541            method = %req_header.method,
542            path = %req_header.uri.path(),
543            host = ctx.host.as_deref().unwrap_or("-"),
544            client_ip = %ctx.client_ip,
545            "Processing request"
546        );
547
548        // Get upstream pool (skip for static routes)
549        if ctx
550            .upstream
551            .as_ref()
552            .is_some_and(|u| u.starts_with("_static_"))
553        {
554            // Static routes are handled in request_filter, should not reach here
555            return Err(Error::explain(
556                ErrorType::InternalError,
557                "Static route should be handled in request_filter",
558            ));
559        }
560
561        let upstream_name = ctx
562            .upstream
563            .as_ref()
564            .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
565
566        trace!(
567            correlation_id = %ctx.trace_id,
568            upstream = %upstream_name,
569            "Looking up upstream pool"
570        );
571
572        let pool = self
573            .upstream_pools
574            .get(upstream_name)
575            .await
576            .ok_or_else(|| {
577                error!(
578                    correlation_id = %ctx.trace_id,
579                    upstream = %upstream_name,
580                    "Upstream pool not found"
581                );
582                Error::explain(
583                    ErrorType::InternalError,
584                    format!("Upstream pool '{}' not found", upstream_name),
585                )
586            })?;
587
588        // Select peer from pool with retries
589        let max_retries = route_match
590            .config
591            .retry_policy
592            .as_ref()
593            .map(|r| r.max_attempts)
594            .unwrap_or(1);
595
596        trace!(
597            correlation_id = %ctx.trace_id,
598            upstream = %upstream_name,
599            max_retries = max_retries,
600            "Starting upstream peer selection"
601        );
602
603        let mut last_error = None;
604        let selection_start = std::time::Instant::now();
605
606        for attempt in 1..=max_retries {
607            ctx.upstream_attempts = attempt;
608
609            trace!(
610                correlation_id = %ctx.trace_id,
611                upstream = %upstream_name,
612                attempt = attempt,
613                max_retries = max_retries,
614                "Attempting to select upstream peer"
615            );
616
617            match pool.select_peer_with_metadata(None).await {
618                Ok((peer, metadata)) => {
619                    let selection_duration = selection_start.elapsed();
620                    // Store selected peer address for feedback reporting in logging()
621                    let peer_addr = peer.address().to_string();
622                    ctx.selected_upstream_address = Some(peer_addr.clone());
623
624                    // Copy sticky session metadata to context for response_filter
625                    if metadata.contains_key("sticky_session_new") {
626                        ctx.sticky_session_new_assignment = true;
627                        ctx.sticky_session_set_cookie =
628                            metadata.get("sticky_set_cookie_header").cloned();
629                        ctx.sticky_target_index = metadata
630                            .get("sticky_target_index")
631                            .and_then(|s| s.parse().ok());
632
633                        trace!(
634                            correlation_id = %ctx.trace_id,
635                            sticky_target_index = ?ctx.sticky_target_index,
636                            "New sticky session assignment, will set cookie"
637                        );
638                    }
639
640                    debug!(
641                        correlation_id = %ctx.trace_id,
642                        upstream = %upstream_name,
643                        peer_address = %peer_addr,
644                        attempt = attempt,
645                        selection_duration_us = selection_duration.as_micros(),
646                        sticky_session_hit = metadata.contains_key("sticky_session_hit"),
647                        sticky_session_new = ctx.sticky_session_new_assignment,
648                        "Selected upstream peer"
649                    );
650                    return Ok(Box::new(peer));
651                }
652                Err(e) => {
653                    warn!(
654                        correlation_id = %ctx.trace_id,
655                        upstream = %upstream_name,
656                        attempt = attempt,
657                        max_retries = max_retries,
658                        error = %e,
659                        "Failed to select upstream peer"
660                    );
661                    last_error = Some(e);
662
663                    if attempt < max_retries {
664                        // Exponential backoff (using pingora-timeout for efficiency)
665                        let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
666                        trace!(
667                            correlation_id = %ctx.trace_id,
668                            backoff_ms = backoff.as_millis(),
669                            "Backing off before retry"
670                        );
671                        sleep(backoff).await;
672                    }
673                }
674            }
675        }
676
677        let selection_duration = selection_start.elapsed();
678        error!(
679            correlation_id = %ctx.trace_id,
680            upstream = %upstream_name,
681            attempts = max_retries,
682            selection_duration_ms = selection_duration.as_millis(),
683            last_error = ?last_error,
684            "All upstream selection attempts failed"
685        );
686
687        // Record exhausted metric if fallback was used but all upstreams failed
688        if ctx.used_fallback() {
689            if let Some(metrics) = get_fallback_metrics() {
690                metrics.record_fallback_exhausted(ctx.route_id.as_deref().unwrap_or("unknown"));
691            }
692        }
693
694        Err(Error::explain(
695            ErrorType::InternalError,
696            format!("All upstream attempts failed: {:?}", last_error),
697        ))
698    }
699
700    async fn request_filter(
701        &self,
702        session: &mut Session,
703        ctx: &mut Self::CTX,
704    ) -> Result<bool, Box<Error>> {
705        trace!(
706            correlation_id = %ctx.trace_id,
707            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
708            "Starting request filter phase"
709        );
710
711        // Check rate limiting early (before other processing)
712        // Fast path: skip if no rate limiting is configured for this route
713        if let Some(route_id) = ctx.route_id.as_deref() {
714            if self.rate_limit_manager.has_route_limiter(route_id) {
715                let rate_result = self.rate_limit_manager.check(
716                    route_id,
717                    &ctx.client_ip,
718                    &ctx.path,
719                    Option::<&NoHeaderAccessor>::None,
720                );
721
722                // Store rate limit info for response headers (even if allowed)
723                if rate_result.limit > 0 {
724                    ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
725                        limit: rate_result.limit,
726                        remaining: rate_result.remaining,
727                        reset_at: rate_result.reset_at,
728                    });
729                }
730
731                if !rate_result.allowed {
732                    use sentinel_config::RateLimitAction;
733
734                    match rate_result.action {
735                        RateLimitAction::Reject => {
736                            warn!(
737                                correlation_id = %ctx.trace_id,
738                                route_id = route_id,
739                                client_ip = %ctx.client_ip,
740                                limiter = %rate_result.limiter,
741                                limit = rate_result.limit,
742                                remaining = rate_result.remaining,
743                                "Request rate limited"
744                            );
745                            self.metrics.record_blocked_request("rate_limited");
746
747                            // Audit log the rate limit
748                            let audit_entry = AuditLogEntry::rate_limited(
749                                &ctx.trace_id,
750                                &ctx.method,
751                                &ctx.path,
752                                &ctx.client_ip,
753                                &rate_result.limiter,
754                            )
755                            .with_route_id(route_id)
756                            .with_status_code(rate_result.status_code);
757                            self.log_manager.log_audit(&audit_entry);
758
759                            // Send rate limit response with headers
760                            let body = rate_result
761                                .message
762                                .unwrap_or_else(|| "Rate limit exceeded".to_string());
763
764                            // Build response with rate limit headers
765                            let retry_after = rate_result.reset_at.saturating_sub(
766                                std::time::SystemTime::now()
767                                    .duration_since(std::time::UNIX_EPOCH)
768                                    .unwrap_or_default()
769                                    .as_secs(),
770                            );
771                            crate::http_helpers::write_rate_limit_error(
772                                session,
773                                rate_result.status_code,
774                                &body,
775                                rate_result.limit,
776                                rate_result.remaining,
777                                rate_result.reset_at,
778                                retry_after,
779                            )
780                            .await?;
781                            return Ok(true); // Request complete, don't continue
782                        }
783                        RateLimitAction::LogOnly => {
784                            debug!(
785                                correlation_id = %ctx.trace_id,
786                                route_id = route_id,
787                                "Rate limit exceeded (log only mode)"
788                            );
789                            // Continue processing
790                        }
791                        RateLimitAction::Delay => {
792                            // Apply delay if suggested by rate limiter
793                            if let Some(delay_ms) = rate_result.suggested_delay_ms {
794                                // Cap delay at the configured maximum
795                                let actual_delay = delay_ms.min(rate_result.max_delay_ms);
796
797                                if actual_delay > 0 {
798                                    debug!(
799                                        correlation_id = %ctx.trace_id,
800                                        route_id = route_id,
801                                        suggested_delay_ms = delay_ms,
802                                        max_delay_ms = rate_result.max_delay_ms,
803                                        actual_delay_ms = actual_delay,
804                                        "Applying rate limit delay"
805                                    );
806
807                                    tokio::time::sleep(std::time::Duration::from_millis(
808                                        actual_delay,
809                                    ))
810                                    .await;
811                                }
812                            }
813                            // Continue processing after delay
814                        }
815                    }
816                }
817            }
818        }
819
820        // Inference rate limiting (token-based, for LLM/AI routes)
821        // This runs after regular rate limiting and checks service type
822        if let Some(route_id) = ctx.route_id.as_deref() {
823            if let Some(ref route_config) = ctx.route_config {
824                if route_config.service_type == sentinel_config::ServiceType::Inference
825                    && self.inference_rate_limit_manager.has_route(route_id)
826                {
827                    // For inference rate limiting, we need access to the request body
828                    // to estimate tokens. We'll use buffered body if available.
829                    let headers = &session.req_header().headers;
830
831                    // Try to get buffered body, or use empty (will estimate from headers only)
832                    let body = ctx.body_buffer.as_slice();
833
834                    // Use client IP as the rate limit key (could be enhanced to use API key header)
835                    let rate_limit_key = &ctx.client_ip;
836
837                    if let Some(check_result) = self.inference_rate_limit_manager.check(
838                        route_id,
839                        rate_limit_key,
840                        headers,
841                        body,
842                    ) {
843                        // Store inference rate limiting context for recording actual tokens later
844                        ctx.inference_rate_limit_enabled = true;
845                        ctx.inference_estimated_tokens = check_result.estimated_tokens;
846                        ctx.inference_rate_limit_key = Some(rate_limit_key.to_string());
847                        ctx.inference_model = check_result.model.clone();
848
849                        if !check_result.is_allowed() {
850                            let retry_after_ms = check_result.retry_after_ms();
851                            let retry_after_secs = retry_after_ms.div_ceil(1000);
852
853                            warn!(
854                                correlation_id = %ctx.trace_id,
855                                route_id = route_id,
856                                client_ip = %ctx.client_ip,
857                                estimated_tokens = check_result.estimated_tokens,
858                                model = ?check_result.model,
859                                retry_after_ms = retry_after_ms,
860                                "Inference rate limit exceeded (tokens)"
861                            );
862                            self.metrics.record_blocked_request("inference_rate_limited");
863
864                            // Audit log the token rate limit
865                            let audit_entry = AuditLogEntry::new(
866                                &ctx.trace_id,
867                                AuditEventType::RateLimitExceeded,
868                                &ctx.method,
869                                &ctx.path,
870                                &ctx.client_ip,
871                            )
872                            .with_route_id(route_id)
873                            .with_status_code(429)
874                            .with_reason(format!(
875                                "Token rate limit exceeded: estimated {} tokens, model={:?}",
876                                check_result.estimated_tokens, check_result.model
877                            ));
878                            self.log_manager.log_audit(&audit_entry);
879
880                            // Send 429 response with appropriate headers
881                            let body = "Token rate limit exceeded";
882                            let reset_at = std::time::SystemTime::now()
883                                .duration_since(std::time::UNIX_EPOCH)
884                                .unwrap_or_default()
885                                .as_secs()
886                                + retry_after_secs;
887
888                            // Use simplified error write for inference rate limit
889                            crate::http_helpers::write_rate_limit_error(
890                                session,
891                                429,
892                                body,
893                                0, // No request limit
894                                0, // No remaining
895                                reset_at,
896                                retry_after_secs,
897                            )
898                            .await?;
899                            return Ok(true); // Request complete, don't continue
900                        }
901
902                        trace!(
903                            correlation_id = %ctx.trace_id,
904                            route_id = route_id,
905                            estimated_tokens = check_result.estimated_tokens,
906                            model = ?check_result.model,
907                            "Inference rate limit check passed"
908                        );
909
910                        // Check budget tracking (cumulative per-period limits)
911                        if self.inference_rate_limit_manager.has_budget(route_id) {
912                            ctx.inference_budget_enabled = true;
913
914                            if let Some(budget_result) = self.inference_rate_limit_manager.check_budget(
915                                route_id,
916                                rate_limit_key,
917                                check_result.estimated_tokens,
918                            ) {
919                                if !budget_result.is_allowed() {
920                                    let retry_after_secs = budget_result.retry_after_secs();
921
922                                    warn!(
923                                        correlation_id = %ctx.trace_id,
924                                        route_id = route_id,
925                                        client_ip = %ctx.client_ip,
926                                        estimated_tokens = check_result.estimated_tokens,
927                                        retry_after_secs = retry_after_secs,
928                                        "Token budget exhausted"
929                                    );
930
931                                    ctx.inference_budget_exhausted = true;
932                                    self.metrics.record_blocked_request("budget_exhausted");
933
934                                    // Audit log the budget exhaustion
935                                    let audit_entry = AuditLogEntry::new(
936                                        &ctx.trace_id,
937                                        AuditEventType::RateLimitExceeded,
938                                        &ctx.method,
939                                        &ctx.path,
940                                        &ctx.client_ip,
941                                    )
942                                    .with_route_id(route_id)
943                                    .with_status_code(429)
944                                    .with_reason("Token budget exhausted".to_string());
945                                    self.log_manager.log_audit(&audit_entry);
946
947                                    // Send 429 response with budget headers
948                                    let body = "Token budget exhausted";
949                                    let reset_at = std::time::SystemTime::now()
950                                        .duration_since(std::time::UNIX_EPOCH)
951                                        .unwrap_or_default()
952                                        .as_secs()
953                                        + retry_after_secs;
954
955                                    crate::http_helpers::write_rate_limit_error(
956                                        session,
957                                        429,
958                                        body,
959                                        0,
960                                        0,
961                                        reset_at,
962                                        retry_after_secs,
963                                    )
964                                    .await?;
965                                    return Ok(true);
966                                }
967
968                                // Capture budget status for response headers
969                                let remaining = match &budget_result {
970                                    sentinel_common::budget::BudgetCheckResult::Allowed { remaining } => *remaining as i64,
971                                    sentinel_common::budget::BudgetCheckResult::Soft { remaining, .. } => *remaining,
972                                    _ => 0,
973                                };
974                                ctx.inference_budget_remaining = Some(remaining);
975
976                                // Get period reset time from budget status
977                                if let Some(status) = self.inference_rate_limit_manager.budget_status(
978                                    route_id,
979                                    rate_limit_key,
980                                ) {
981                                    ctx.inference_budget_period_reset = Some(status.period_end);
982                                }
983
984                                trace!(
985                                    correlation_id = %ctx.trace_id,
986                                    route_id = route_id,
987                                    budget_remaining = remaining,
988                                    "Token budget check passed"
989                                );
990                            }
991                        }
992
993                        // Check if cost attribution is enabled
994                        if self.inference_rate_limit_manager.has_cost_attribution(route_id) {
995                            ctx.inference_cost_enabled = true;
996                        }
997                    }
998                }
999            }
1000        }
1001
1002        // Prompt injection guardrail (for inference routes)
1003        if let Some(ref route_config) = ctx.route_config {
1004            if let Some(ref inference) = route_config.inference {
1005                if let Some(ref guardrails) = inference.guardrails {
1006                    if let Some(ref pi_config) = guardrails.prompt_injection {
1007                        if pi_config.enabled && !ctx.body_buffer.is_empty() {
1008                            ctx.guardrails_enabled = true;
1009
1010                            // Extract content from request body
1011                            if let Some(content) = extract_inference_content(&ctx.body_buffer) {
1012                                let result = self
1013                                    .guardrail_processor
1014                                    .check_prompt_injection(
1015                                        pi_config,
1016                                        &content,
1017                                        ctx.inference_model.as_deref(),
1018                                        ctx.route_id.as_deref(),
1019                                        &ctx.trace_id,
1020                                    )
1021                                    .await;
1022
1023                                match result {
1024                                    PromptInjectionResult::Blocked {
1025                                        status,
1026                                        message,
1027                                        detections,
1028                                    } => {
1029                                        warn!(
1030                                            correlation_id = %ctx.trace_id,
1031                                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1032                                            detection_count = detections.len(),
1033                                            "Prompt injection detected, blocking request"
1034                                        );
1035
1036                                        self.metrics.record_blocked_request("prompt_injection");
1037
1038                                        // Store detection categories for logging
1039                                        ctx.guardrail_detection_categories = detections
1040                                            .iter()
1041                                            .map(|d| d.category.clone())
1042                                            .collect();
1043
1044                                        // Audit log the block
1045                                        let audit_entry = AuditLogEntry::new(
1046                                            &ctx.trace_id,
1047                                            AuditEventType::Blocked,
1048                                            &ctx.method,
1049                                            &ctx.path,
1050                                            &ctx.client_ip,
1051                                        )
1052                                        .with_route_id(
1053                                            ctx.route_id.as_deref().unwrap_or("unknown"),
1054                                        )
1055                                        .with_status_code(status)
1056                                        .with_reason("Prompt injection detected".to_string());
1057                                        self.log_manager.log_audit(&audit_entry);
1058
1059                                        // Send error response
1060                                        crate::http_helpers::write_json_error(
1061                                            session,
1062                                            status,
1063                                            "prompt_injection_blocked",
1064                                            Some(&message),
1065                                        )
1066                                        .await?;
1067                                        return Ok(true);
1068                                    }
1069                                    PromptInjectionResult::Detected { detections } => {
1070                                        // Log but allow
1071                                        warn!(
1072                                            correlation_id = %ctx.trace_id,
1073                                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1074                                            detection_count = detections.len(),
1075                                            "Prompt injection detected (logged only)"
1076                                        );
1077                                        ctx.guardrail_detection_categories = detections
1078                                            .iter()
1079                                            .map(|d| d.category.clone())
1080                                            .collect();
1081                                    }
1082                                    PromptInjectionResult::Warning { detections } => {
1083                                        // Set flag for response header
1084                                        ctx.guardrail_warning = true;
1085                                        ctx.guardrail_detection_categories = detections
1086                                            .iter()
1087                                            .map(|d| d.category.clone())
1088                                            .collect();
1089                                        debug!(
1090                                            correlation_id = %ctx.trace_id,
1091                                            "Prompt injection warning set"
1092                                        );
1093                                    }
1094                                    PromptInjectionResult::Clean => {
1095                                        trace!(
1096                                            correlation_id = %ctx.trace_id,
1097                                            "No prompt injection detected"
1098                                        );
1099                                    }
1100                                    PromptInjectionResult::Error { message } => {
1101                                        // Already logged in processor, just trace here
1102                                        trace!(
1103                                            correlation_id = %ctx.trace_id,
1104                                            error = %message,
1105                                            "Prompt injection check error (failure mode applied)"
1106                                        );
1107                                    }
1108                                }
1109                            }
1110                        }
1111                    }
1112                }
1113            }
1114        }
1115
1116        // Geo filtering
1117        if let Some(route_id) = ctx.route_id.as_deref() {
1118            if let Some(ref route_config) = ctx.route_config {
1119                for filter_id in &route_config.filters {
1120                    if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
1121                        // Store country code for response header
1122                        ctx.geo_country_code = result.country_code.clone();
1123                        ctx.geo_lookup_performed = true;
1124
1125                        if !result.allowed {
1126                            warn!(
1127                                correlation_id = %ctx.trace_id,
1128                                route_id = route_id,
1129                                client_ip = %ctx.client_ip,
1130                                country = ?result.country_code,
1131                                filter_id = %filter_id,
1132                                "Request blocked by geo filter"
1133                            );
1134                            self.metrics.record_blocked_request("geo_blocked");
1135
1136                            // Audit log the geo block
1137                            let audit_entry = AuditLogEntry::new(
1138                                &ctx.trace_id,
1139                                AuditEventType::Blocked,
1140                                &ctx.method,
1141                                &ctx.path,
1142                                &ctx.client_ip,
1143                            )
1144                            .with_route_id(route_id)
1145                            .with_status_code(result.status_code)
1146                            .with_reason(format!(
1147                                "Geo blocked: country={}, filter={}",
1148                                result.country_code.as_deref().unwrap_or("unknown"),
1149                                filter_id
1150                            ));
1151                            self.log_manager.log_audit(&audit_entry);
1152
1153                            // Send geo block response
1154                            let body = result
1155                                .block_message
1156                                .unwrap_or_else(|| "Access denied".to_string());
1157
1158                            crate::http_helpers::write_error(
1159                                session,
1160                                result.status_code,
1161                                &body,
1162                                "text/plain",
1163                            )
1164                            .await?;
1165                            return Ok(true); // Request complete, don't continue
1166                        }
1167
1168                        // Only check first geo filter that matches
1169                        break;
1170                    }
1171                }
1172            }
1173        }
1174
1175        // Check for WebSocket upgrade requests
1176        let is_websocket_upgrade = session
1177            .req_header()
1178            .headers
1179            .get(http::header::UPGRADE)
1180            .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
1181            .unwrap_or(false);
1182
1183        if is_websocket_upgrade {
1184            ctx.is_websocket_upgrade = true;
1185
1186            // Check if route allows WebSocket upgrades
1187            if let Some(ref route_config) = ctx.route_config {
1188                if !route_config.websocket {
1189                    warn!(
1190                        correlation_id = %ctx.trace_id,
1191                        route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1192                        client_ip = %ctx.client_ip,
1193                        "WebSocket upgrade rejected: not enabled for route"
1194                    );
1195
1196                    self.metrics.record_blocked_request("websocket_not_enabled");
1197
1198                    // Audit log the rejection
1199                    let audit_entry = AuditLogEntry::new(
1200                        &ctx.trace_id,
1201                        AuditEventType::Blocked,
1202                        &ctx.method,
1203                        &ctx.path,
1204                        &ctx.client_ip,
1205                    )
1206                    .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
1207                    .with_action("websocket_rejected")
1208                    .with_reason("WebSocket not enabled for route");
1209                    self.log_manager.log_audit(&audit_entry);
1210
1211                    // Send 403 Forbidden response
1212                    crate::http_helpers::write_error(
1213                        session,
1214                        403,
1215                        "WebSocket not enabled for this route",
1216                        "text/plain",
1217                    )
1218                    .await?;
1219                    return Ok(true); // Request complete, don't continue
1220                }
1221
1222                debug!(
1223                    correlation_id = %ctx.trace_id,
1224                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1225                    "WebSocket upgrade request allowed"
1226                );
1227
1228                // Check for WebSocket frame inspection
1229                if route_config.websocket_inspection {
1230                    // Check for compression negotiation - skip inspection if permessage-deflate
1231                    let has_compression = session
1232                        .req_header()
1233                        .headers
1234                        .get("Sec-WebSocket-Extensions")
1235                        .and_then(|v| v.to_str().ok())
1236                        .map(|s| s.contains("permessage-deflate"))
1237                        .unwrap_or(false);
1238
1239                    if has_compression {
1240                        debug!(
1241                            correlation_id = %ctx.trace_id,
1242                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1243                            "WebSocket inspection skipped: permessage-deflate negotiated"
1244                        );
1245                        ctx.websocket_skip_inspection = true;
1246                    } else {
1247                        ctx.websocket_inspection_enabled = true;
1248
1249                        // Get agents that handle WebSocketFrame events
1250                        ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
1251                            sentinel_agent_protocol::EventType::WebSocketFrame,
1252                        );
1253
1254                        debug!(
1255                            correlation_id = %ctx.trace_id,
1256                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1257                            agent_count = ctx.websocket_inspection_agents.len(),
1258                            "WebSocket frame inspection enabled"
1259                        );
1260                    }
1261                }
1262            }
1263        }
1264
1265        // Use cached route config from upstream_peer (avoids duplicate route matching)
1266        // Handle static file and builtin routes
1267        if let Some(route_config) = ctx.route_config.clone() {
1268            if route_config.service_type == sentinel_config::ServiceType::Static {
1269                trace!(
1270                    correlation_id = %ctx.trace_id,
1271                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1272                    "Handling static file route"
1273                );
1274                // Create a minimal RouteMatch for the handler
1275                let route_match = crate::routing::RouteMatch {
1276                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1277                    config: route_config.clone(),
1278                };
1279                return self.handle_static_route(session, ctx, &route_match).await;
1280            } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
1281                trace!(
1282                    correlation_id = %ctx.trace_id,
1283                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1284                    builtin_handler = ?route_config.builtin_handler,
1285                    "Handling builtin route"
1286                );
1287                // Create a minimal RouteMatch for the handler
1288                let route_match = crate::routing::RouteMatch {
1289                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1290                    config: route_config.clone(),
1291                };
1292                return self.handle_builtin_route(session, ctx, &route_match).await;
1293            }
1294        }
1295
1296        // API validation for API routes
1297        if let Some(route_id) = ctx.route_id.clone() {
1298            if let Some(validator) = self.validators.get(&route_id).await {
1299                trace!(
1300                    correlation_id = %ctx.trace_id,
1301                    route_id = %route_id,
1302                    "Running API schema validation"
1303                );
1304                if let Some(result) = self
1305                    .validate_api_request(session, ctx, &route_id, &validator)
1306                    .await?
1307                {
1308                    debug!(
1309                        correlation_id = %ctx.trace_id,
1310                        route_id = %route_id,
1311                        validation_passed = result,
1312                        "API validation complete"
1313                    );
1314                    return Ok(result);
1315                }
1316            }
1317        }
1318
1319        // Get client address before mutable borrow
1320        let client_addr = session
1321            .client_addr()
1322            .map(|a| format!("{}", a))
1323            .unwrap_or_else(|| "unknown".to_string());
1324        let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
1325
1326        let req_header = session.req_header_mut();
1327
1328        // Add correlation ID header
1329        req_header
1330            .insert_header("X-Correlation-Id", &ctx.trace_id)
1331            .ok();
1332        req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
1333
1334        // Use cached config (set in upstream_peer, or fetch now if needed)
1335        let config = ctx
1336            .config
1337            .get_or_insert_with(|| self.config_manager.current());
1338
1339        // Enforce header limits (fast path: skip if limits are very high)
1340        const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; // 1MB = effectively unlimited
1341
1342        // Header count check - O(1)
1343        let header_count = req_header.headers.len();
1344        if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
1345            && header_count > config.limits.max_header_count
1346        {
1347            warn!(
1348                correlation_id = %ctx.trace_id,
1349                header_count = header_count,
1350                limit = config.limits.max_header_count,
1351                "Request blocked: exceeds header count limit"
1352            );
1353
1354            self.metrics.record_blocked_request("header_count_exceeded");
1355            return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
1356        }
1357
1358        // Header size check - O(n), skip if limit is very high
1359        if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
1360            let total_header_size: usize = req_header
1361                .headers
1362                .iter()
1363                .map(|(k, v)| k.as_str().len() + v.len())
1364                .sum();
1365
1366            if total_header_size > config.limits.max_header_size_bytes {
1367                warn!(
1368                    correlation_id = %ctx.trace_id,
1369                    header_size = total_header_size,
1370                    limit = config.limits.max_header_size_bytes,
1371                    "Request blocked: exceeds header size limit"
1372                );
1373
1374                self.metrics.record_blocked_request("header_size_exceeded");
1375                return Err(Error::explain(
1376                    ErrorType::InternalError,
1377                    "Headers too large",
1378                ));
1379            }
1380        }
1381
1382        // Process through external agents
1383        trace!(
1384            correlation_id = %ctx.trace_id,
1385            "Processing request through agents"
1386        );
1387        if let Err(e) = self
1388            .process_agents(session, ctx, &client_addr, client_port)
1389            .await
1390        {
1391            // Check if this is an HTTPStatus error (e.g., agent block or fail-closed)
1392            // In that case, we need to send a proper HTTP response instead of just closing the connection
1393            if let ErrorType::HTTPStatus(status) = e.etype() {
1394                // Extract the message from the error (the context part after "HTTPStatus context:")
1395                let error_msg = e.to_string();
1396                let body = error_msg
1397                    .split("context:")
1398                    .nth(1)
1399                    .map(|s| s.trim())
1400                    .unwrap_or("Request blocked");
1401                debug!(
1402                    correlation_id = %ctx.trace_id,
1403                    status = status,
1404                    body = %body,
1405                    "Sending HTTP error response for agent block"
1406                );
1407                crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
1408                return Ok(true); // Request complete, don't continue to upstream
1409            }
1410            // For other errors, propagate them
1411            return Err(e);
1412        }
1413
1414        trace!(
1415            correlation_id = %ctx.trace_id,
1416            "Request filter phase complete, forwarding to upstream"
1417        );
1418
1419        Ok(false) // Continue processing
1420    }
1421
1422    /// Process incoming request body chunks.
1423    /// Used for body size enforcement and WAF/agent inspection.
1424    ///
1425    /// Supports two modes:
1426    /// - **Buffer mode** (default): Buffer chunks until end of stream or limit, then send to agents
1427    /// - **Stream mode**: Send each chunk immediately to agents as it arrives
1428    async fn request_body_filter(
1429        &self,
1430        _session: &mut Session,
1431        body: &mut Option<Bytes>,
1432        end_of_stream: bool,
1433        ctx: &mut Self::CTX,
1434    ) -> Result<(), Box<Error>> {
1435        use sentinel_config::BodyStreamingMode;
1436
1437        // Handle WebSocket frame inspection (client -> server)
1438        if ctx.is_websocket_upgrade {
1439            if let Some(ref handler) = ctx.websocket_handler {
1440                let result = handler.process_client_data(body.take()).await;
1441                match result {
1442                    crate::websocket::ProcessResult::Forward(data) => {
1443                        *body = data;
1444                    }
1445                    crate::websocket::ProcessResult::Close(reason) => {
1446                        warn!(
1447                            correlation_id = %ctx.trace_id,
1448                            code = reason.code,
1449                            reason = %reason.reason,
1450                            "WebSocket connection closed by agent (client->server)"
1451                        );
1452                        // Return an error to close the connection
1453                        return Err(Error::explain(
1454                            ErrorType::InternalError,
1455                            format!("WebSocket closed: {} {}", reason.code, reason.reason),
1456                        ));
1457                    }
1458                }
1459            }
1460            // Skip normal body processing for WebSocket
1461            return Ok(());
1462        }
1463
1464        // Track request body size
1465        let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
1466        if chunk_len > 0 {
1467            ctx.request_body_bytes += chunk_len as u64;
1468
1469            trace!(
1470                correlation_id = %ctx.trace_id,
1471                chunk_size = chunk_len,
1472                total_body_bytes = ctx.request_body_bytes,
1473                end_of_stream = end_of_stream,
1474                streaming_mode = ?ctx.request_body_streaming_mode,
1475                "Processing request body chunk"
1476            );
1477
1478            // Check body size limit (use cached config)
1479            let config = ctx
1480                .config
1481                .get_or_insert_with(|| self.config_manager.current());
1482            if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
1483                warn!(
1484                    correlation_id = %ctx.trace_id,
1485                    body_bytes = ctx.request_body_bytes,
1486                    limit = config.limits.max_body_size_bytes,
1487                    "Request body size limit exceeded"
1488                );
1489                self.metrics.record_blocked_request("body_size_exceeded");
1490                return Err(Error::explain(
1491                    ErrorType::InternalError,
1492                    "Request body too large",
1493                ));
1494            }
1495        }
1496
1497        // Body inspection for agents (WAF, etc.)
1498        if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
1499            let config = ctx
1500                .config
1501                .get_or_insert_with(|| self.config_manager.current());
1502            let max_inspection_bytes = config
1503                .waf
1504                .as_ref()
1505                .map(|w| w.body_inspection.max_inspection_bytes as u64)
1506                .unwrap_or(1024 * 1024);
1507
1508            match ctx.request_body_streaming_mode {
1509                BodyStreamingMode::Stream => {
1510                    // Stream mode: send each chunk immediately
1511                    if body.is_some() {
1512                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1513                            .await?;
1514                    } else if end_of_stream && ctx.agent_needs_more {
1515                        // Send final empty chunk to signal end
1516                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1517                            .await?;
1518                    }
1519                }
1520                BodyStreamingMode::Hybrid { buffer_threshold } => {
1521                    // Hybrid mode: buffer up to threshold, then stream
1522                    if ctx.body_bytes_inspected < buffer_threshold as u64 {
1523                        // Still in buffering phase
1524                        if let Some(ref chunk) = body {
1525                            let bytes_to_buffer = std::cmp::min(
1526                                chunk.len(),
1527                                (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
1528                            );
1529                            ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
1530                            ctx.body_bytes_inspected += bytes_to_buffer as u64;
1531
1532                            // If we've reached threshold or end of stream, switch to streaming
1533                            if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
1534                            {
1535                                // Send buffered content first
1536                                self.send_buffered_body_to_agents(
1537                                    end_of_stream && chunk.len() == bytes_to_buffer,
1538                                    ctx,
1539                                )
1540                                .await?;
1541                                ctx.body_buffer.clear();
1542
1543                                // If there's remaining data in this chunk, stream it
1544                                if bytes_to_buffer < chunk.len() {
1545                                    let remaining = chunk.slice(bytes_to_buffer..);
1546                                    let mut remaining_body = Some(remaining);
1547                                    self.process_body_chunk_streaming(
1548                                        &mut remaining_body,
1549                                        end_of_stream,
1550                                        ctx,
1551                                    )
1552                                    .await?;
1553                                }
1554                            }
1555                        }
1556                    } else {
1557                        // Past threshold, stream directly
1558                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1559                            .await?;
1560                    }
1561                }
1562                BodyStreamingMode::Buffer => {
1563                    // Buffer mode: collect chunks until ready to send
1564                    if let Some(ref chunk) = body {
1565                        if ctx.body_bytes_inspected < max_inspection_bytes {
1566                            let bytes_to_inspect = std::cmp::min(
1567                                chunk.len() as u64,
1568                                max_inspection_bytes - ctx.body_bytes_inspected,
1569                            ) as usize;
1570
1571                            ctx.body_buffer
1572                                .extend_from_slice(&chunk[..bytes_to_inspect]);
1573                            ctx.body_bytes_inspected += bytes_to_inspect as u64;
1574
1575                            trace!(
1576                                correlation_id = %ctx.trace_id,
1577                                bytes_inspected = ctx.body_bytes_inspected,
1578                                max_inspection_bytes = max_inspection_bytes,
1579                                buffer_size = ctx.body_buffer.len(),
1580                                "Buffering body for agent inspection"
1581                            );
1582                        }
1583                    }
1584
1585                    // Send when complete or limit reached
1586                    let should_send =
1587                        end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1588                    if should_send && !ctx.body_buffer.is_empty() {
1589                        self.send_buffered_body_to_agents(end_of_stream, ctx)
1590                            .await?;
1591                        ctx.body_buffer.clear();
1592                    }
1593                }
1594            }
1595        }
1596
1597        if end_of_stream {
1598            trace!(
1599                correlation_id = %ctx.trace_id,
1600                total_body_bytes = ctx.request_body_bytes,
1601                bytes_inspected = ctx.body_bytes_inspected,
1602                "Request body complete"
1603            );
1604        }
1605
1606        Ok(())
1607    }
1608
1609    async fn response_filter(
1610        &self,
1611        _session: &mut Session,
1612        upstream_response: &mut ResponseHeader,
1613        ctx: &mut Self::CTX,
1614    ) -> Result<(), Box<Error>> {
1615        let status = upstream_response.status.as_u16();
1616        let duration = ctx.elapsed();
1617
1618        trace!(
1619            correlation_id = %ctx.trace_id,
1620            status = status,
1621            "Starting response filter phase"
1622        );
1623
1624        // Handle WebSocket 101 Switching Protocols
1625        if status == 101 && ctx.is_websocket_upgrade {
1626            if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1627                // Create WebSocket inspector and handler with metrics
1628                let inspector = crate::websocket::WebSocketInspector::with_metrics(
1629                    self.agent_manager.clone(),
1630                    ctx.route_id
1631                        .clone()
1632                        .unwrap_or_else(|| "unknown".to_string()),
1633                    ctx.trace_id.clone(),
1634                    ctx.client_ip.clone(),
1635                    100, // 100ms timeout per frame inspection
1636                    Some(self.metrics.clone()),
1637                );
1638
1639                let handler = crate::websocket::WebSocketHandler::new(
1640                    std::sync::Arc::new(inspector),
1641                    1024 * 1024, // 1MB max frame size
1642                );
1643
1644                ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1645
1646                info!(
1647                    correlation_id = %ctx.trace_id,
1648                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1649                    agent_count = ctx.websocket_inspection_agents.len(),
1650                    "WebSocket upgrade successful, frame inspection enabled"
1651                );
1652            } else if ctx.websocket_skip_inspection {
1653                debug!(
1654                    correlation_id = %ctx.trace_id,
1655                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1656                    "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1657                );
1658            } else {
1659                debug!(
1660                    correlation_id = %ctx.trace_id,
1661                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1662                    "WebSocket upgrade successful"
1663                );
1664            }
1665        }
1666
1667        // Apply security headers
1668        trace!(
1669            correlation_id = %ctx.trace_id,
1670            "Applying security headers"
1671        );
1672        self.apply_security_headers(upstream_response).ok();
1673
1674        // Add correlation ID to response
1675        upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1676
1677        // Add rate limit headers if rate limiting was applied
1678        if let Some(ref rate_info) = ctx.rate_limit_info {
1679            upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1680            upstream_response
1681                .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1682            upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1683        }
1684
1685        // Add token budget headers if budget tracking was enabled
1686        if ctx.inference_budget_enabled {
1687            if let Some(remaining) = ctx.inference_budget_remaining {
1688                upstream_response.insert_header("X-Budget-Remaining", remaining.to_string())?;
1689            }
1690            if let Some(period_reset) = ctx.inference_budget_period_reset {
1691                // Format as ISO 8601 timestamp
1692                let reset_datetime = chrono::DateTime::from_timestamp(period_reset as i64, 0)
1693                    .map(|dt| dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1694                    .unwrap_or_else(|| period_reset.to_string());
1695                upstream_response.insert_header("X-Budget-Period-Reset", reset_datetime)?;
1696            }
1697        }
1698
1699        // Add GeoIP country header if geo lookup was performed
1700        if let Some(ref country_code) = ctx.geo_country_code {
1701            upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1702        }
1703
1704        // Add sticky session cookie if a new assignment was made
1705        if ctx.sticky_session_new_assignment {
1706            if let Some(ref set_cookie_header) = ctx.sticky_session_set_cookie {
1707                upstream_response.insert_header("Set-Cookie", set_cookie_header)?;
1708                trace!(
1709                    correlation_id = %ctx.trace_id,
1710                    sticky_target_index = ?ctx.sticky_target_index,
1711                    "Added sticky session Set-Cookie header"
1712                );
1713            }
1714        }
1715
1716        // Add guardrail warning header if prompt injection was detected (warn mode)
1717        if ctx.guardrail_warning {
1718            upstream_response.insert_header("X-Guardrail-Warning", "prompt-injection-detected")?;
1719        }
1720
1721        // Add fallback routing headers if fallback was used
1722        if ctx.used_fallback() {
1723            upstream_response.insert_header("X-Fallback-Used", "true")?;
1724
1725            if let Some(ref upstream) = ctx.upstream {
1726                upstream_response.insert_header("X-Fallback-Upstream", upstream)?;
1727            }
1728
1729            if let Some(ref reason) = ctx.fallback_reason {
1730                upstream_response.insert_header("X-Fallback-Reason", reason.to_string())?;
1731            }
1732
1733            if let Some(ref original) = ctx.original_upstream {
1734                upstream_response.insert_header("X-Original-Upstream", original)?;
1735            }
1736
1737            if let Some(ref mapping) = ctx.model_mapping_applied {
1738                upstream_response.insert_header(
1739                    "X-Model-Mapping",
1740                    format!("{} -> {}", mapping.0, mapping.1),
1741                )?;
1742            }
1743
1744            trace!(
1745                correlation_id = %ctx.trace_id,
1746                fallback_attempt = ctx.fallback_attempt,
1747                fallback_upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1748                original_upstream = ctx.original_upstream.as_deref().unwrap_or("unknown"),
1749                "Added fallback response headers"
1750            );
1751
1752            // Record fallback success metrics for successful responses (2xx/3xx)
1753            if status < 400 {
1754                if let Some(metrics) = get_fallback_metrics() {
1755                    metrics.record_fallback_success(
1756                        ctx.route_id.as_deref().unwrap_or("unknown"),
1757                        ctx.upstream.as_deref().unwrap_or("unknown"),
1758                    );
1759                }
1760            }
1761        }
1762
1763        // Initialize streaming token counter for SSE responses on inference routes
1764        if ctx.inference_rate_limit_enabled {
1765            // Check if this is an SSE response
1766            let content_type = upstream_response
1767                .headers
1768                .get("content-type")
1769                .and_then(|ct| ct.to_str().ok());
1770
1771            if is_sse_response(content_type) {
1772                // Get provider from route config
1773                let provider = ctx
1774                    .route_config
1775                    .as_ref()
1776                    .and_then(|r| r.inference.as_ref())
1777                    .map(|i| i.provider.clone())
1778                    .unwrap_or_default();
1779
1780                ctx.inference_streaming_response = true;
1781                ctx.inference_streaming_counter = Some(StreamingTokenCounter::new(
1782                    provider,
1783                    ctx.inference_model.clone(),
1784                ));
1785
1786                trace!(
1787                    correlation_id = %ctx.trace_id,
1788                    content_type = ?content_type,
1789                    model = ?ctx.inference_model,
1790                    "Initialized streaming token counter for SSE response"
1791                );
1792            }
1793        }
1794
1795        // Generate custom error pages for error responses
1796        if status >= 400 {
1797            trace!(
1798                correlation_id = %ctx.trace_id,
1799                status = status,
1800                "Handling error response"
1801            );
1802            self.handle_error_response(upstream_response, ctx).await?;
1803        }
1804
1805        // Record metrics
1806        self.metrics.record_request(
1807            ctx.route_id.as_deref().unwrap_or("unknown"),
1808            &ctx.method,
1809            status,
1810            duration,
1811        );
1812
1813        // Record OpenTelemetry span status
1814        if let Some(ref mut span) = ctx.otel_span {
1815            span.set_status(status);
1816            if let Some(ref upstream) = ctx.upstream {
1817                span.set_upstream(upstream, "");
1818            }
1819            if status >= 500 {
1820                span.record_error(&format!("HTTP {}", status));
1821            }
1822        }
1823
1824        // Record passive health check
1825        if let Some(ref upstream) = ctx.upstream {
1826            let success = status < 500;
1827
1828            trace!(
1829                correlation_id = %ctx.trace_id,
1830                upstream = %upstream,
1831                success = success,
1832                status = status,
1833                "Recording passive health check result"
1834            );
1835
1836            let error_msg = if !success {
1837                Some(format!("HTTP {}", status))
1838            } else {
1839                None
1840            };
1841            self.passive_health
1842                .record_outcome(upstream, success, error_msg.as_deref())
1843                .await;
1844
1845            // Report to upstream pool
1846            if let Some(pool) = self.upstream_pools.get(upstream).await {
1847                pool.report_result(upstream, success).await;
1848            }
1849
1850            if !success {
1851                warn!(
1852                    correlation_id = %ctx.trace_id,
1853                    upstream = %upstream,
1854                    status = status,
1855                    "Upstream returned error status"
1856                );
1857            }
1858        }
1859
1860        // Final request completion log
1861        if status >= 500 {
1862            error!(
1863                correlation_id = %ctx.trace_id,
1864                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1865                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1866                method = %ctx.method,
1867                path = %ctx.path,
1868                status = status,
1869                duration_ms = duration.as_millis(),
1870                attempts = ctx.upstream_attempts,
1871                "Request completed with server error"
1872            );
1873        } else if status >= 400 {
1874            warn!(
1875                correlation_id = %ctx.trace_id,
1876                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1877                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1878                method = %ctx.method,
1879                path = %ctx.path,
1880                status = status,
1881                duration_ms = duration.as_millis(),
1882                "Request completed with client error"
1883            );
1884        } else {
1885            debug!(
1886                correlation_id = %ctx.trace_id,
1887                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1888                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1889                method = %ctx.method,
1890                path = %ctx.path,
1891                status = status,
1892                duration_ms = duration.as_millis(),
1893                attempts = ctx.upstream_attempts,
1894                "Request completed"
1895            );
1896        }
1897
1898        Ok(())
1899    }
1900
1901    /// Modify the request before sending to upstream.
1902    /// Used for header modifications, adding authentication, etc.
1903    async fn upstream_request_filter(
1904        &self,
1905        _session: &mut Session,
1906        upstream_request: &mut pingora::http::RequestHeader,
1907        ctx: &mut Self::CTX,
1908    ) -> Result<()>
1909    where
1910        Self::CTX: Send + Sync,
1911    {
1912        trace!(
1913            correlation_id = %ctx.trace_id,
1914            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1915            "Applying upstream request modifications"
1916        );
1917
1918        // Add trace ID header for upstream correlation
1919        upstream_request
1920            .insert_header("X-Trace-Id", &ctx.trace_id)
1921            .ok();
1922
1923        // Add W3C traceparent header for distributed tracing
1924        if let Some(ref span) = ctx.otel_span {
1925            let sampled = ctx.trace_context.as_ref().map(|c| c.sampled).unwrap_or(true);
1926            let traceparent =
1927                crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled);
1928            upstream_request
1929                .insert_header(crate::otel::TRACEPARENT_HEADER, &traceparent)
1930                .ok();
1931        }
1932
1933        // Add request metadata headers
1934        upstream_request
1935            .insert_header("X-Forwarded-By", "Sentinel")
1936            .ok();
1937
1938        // Apply route-specific request header modifications
1939        // Clone the modifications to avoid lifetime issues with the header API
1940        if let Some(ref route_config) = ctx.route_config {
1941            let mods = route_config.policies.request_headers.clone();
1942
1943            // Set headers (overwrite existing)
1944            for (name, value) in mods.set {
1945                upstream_request.insert_header(name, value).ok();
1946            }
1947
1948            // Add headers (append)
1949            for (name, value) in mods.add {
1950                upstream_request.append_header(name, value).ok();
1951            }
1952
1953            // Remove headers
1954            for name in &mods.remove {
1955                upstream_request.remove_header(name);
1956            }
1957
1958            trace!(
1959                correlation_id = %ctx.trace_id,
1960                "Applied request header modifications"
1961            );
1962        }
1963
1964        // Remove sensitive headers that shouldn't go to upstream
1965        upstream_request.remove_header("X-Internal-Token");
1966        upstream_request.remove_header("Authorization-Internal");
1967
1968        // === Traffic Mirroring / Shadowing ===
1969        // Check if this route has shadow configuration
1970        if let Some(ref route_config) = ctx.route_config {
1971            if let Some(ref shadow_config) = route_config.shadow {
1972                // Get snapshot of upstream pools for shadow manager
1973                let pools_snapshot = self.upstream_pools.snapshot().await;
1974                let upstream_pools = std::sync::Arc::new(pools_snapshot);
1975
1976                // Get route ID for metrics labeling
1977                let route_id = ctx.route_id.clone().unwrap_or_else(|| "unknown".to_string());
1978
1979                // Create shadow manager
1980                let shadow_manager = crate::shadow::ShadowManager::new(
1981                    upstream_pools,
1982                    shadow_config.clone(),
1983                    Some(std::sync::Arc::clone(&self.metrics)),
1984                    route_id,
1985                );
1986
1987                // Check if we should shadow this request (sampling + header check)
1988                if shadow_manager.should_shadow(upstream_request) {
1989                    trace!(
1990                        correlation_id = %ctx.trace_id,
1991                        shadow_upstream = %shadow_config.upstream,
1992                        percentage = shadow_config.percentage,
1993                        "Shadowing request"
1994                    );
1995
1996                    // Clone headers for shadow request
1997                    let shadow_headers = upstream_request.clone();
1998
1999                    // Create request context for shadow (simplified from proxy context)
2000                    let shadow_ctx = crate::upstream::RequestContext {
2001                        client_ip: ctx.client_ip.parse().ok(),
2002                        headers: std::collections::HashMap::new(), // Empty for now
2003                        path: ctx.path.clone(),
2004                        method: ctx.method.clone(),
2005                    };
2006
2007                    // Determine if we should buffer the body
2008                    let buffer_body = shadow_config.buffer_body
2009                        && crate::shadow::should_buffer_method(&ctx.method);
2010
2011                    if buffer_body {
2012                        // Body buffering requested - defer shadow request until body is available
2013                        // Store shadow info in context; will be fired in logging phase
2014                        // or when request body filter completes
2015                        trace!(
2016                            correlation_id = %ctx.trace_id,
2017                            "Deferring shadow request until body is buffered"
2018                        );
2019                        ctx.shadow_pending = Some(crate::proxy::context::ShadowPendingRequest {
2020                            headers: shadow_headers,
2021                            manager: std::sync::Arc::new(shadow_manager),
2022                            request_ctx: shadow_ctx,
2023                            include_body: true,
2024                        });
2025                        // Enable body inspection to capture the body for shadow
2026                        // (only if not already enabled for other reasons)
2027                        if !ctx.body_inspection_enabled {
2028                            ctx.body_inspection_enabled = true;
2029                            // Set a reasonable buffer limit from shadow config
2030                            // (body_buffer will accumulate chunks)
2031                        }
2032                    } else {
2033                        // No body buffering needed - fire shadow request immediately
2034                        shadow_manager.shadow_request(shadow_headers, None, shadow_ctx);
2035                        ctx.shadow_sent = true;
2036                    }
2037                }
2038            }
2039        }
2040
2041        Ok(())
2042    }
2043
2044    /// Process response body chunks from upstream.
2045    /// Used for response size tracking and WAF inspection.
2046    ///
2047    /// Note: Response body inspection is currently buffered only (streaming mode not supported
2048    /// for responses due to Pingora's synchronous filter design).
2049    fn response_body_filter(
2050        &self,
2051        _session: &mut Session,
2052        body: &mut Option<Bytes>,
2053        end_of_stream: bool,
2054        ctx: &mut Self::CTX,
2055    ) -> Result<Option<Duration>, Box<Error>> {
2056        // Handle WebSocket frame inspection (server -> client)
2057        // Note: This filter is synchronous, so we use block_in_place for async agent calls
2058        if ctx.is_websocket_upgrade {
2059            if let Some(ref handler) = ctx.websocket_handler {
2060                let handler = handler.clone();
2061                let data = body.take();
2062
2063                // Use block_in_place to run async handler from sync context
2064                // This is safe because Pingora uses a multi-threaded tokio runtime
2065                let result = tokio::task::block_in_place(|| {
2066                    tokio::runtime::Handle::current()
2067                        .block_on(async { handler.process_server_data(data).await })
2068                });
2069
2070                match result {
2071                    crate::websocket::ProcessResult::Forward(data) => {
2072                        *body = data;
2073                    }
2074                    crate::websocket::ProcessResult::Close(reason) => {
2075                        warn!(
2076                            correlation_id = %ctx.trace_id,
2077                            code = reason.code,
2078                            reason = %reason.reason,
2079                            "WebSocket connection closed by agent (server->client)"
2080                        );
2081                        // For sync filter, we can't return an error that closes the connection
2082                        // Instead, inject a close frame
2083                        let close_frame =
2084                            crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
2085                        let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
2086                        if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
2087                            *body = Some(Bytes::from(encoded));
2088                        }
2089                    }
2090                }
2091            }
2092            // Skip normal body processing for WebSocket
2093            return Ok(None);
2094        }
2095
2096        // Track response body size
2097        if let Some(ref chunk) = body {
2098            ctx.response_bytes += chunk.len() as u64;
2099
2100            trace!(
2101                correlation_id = %ctx.trace_id,
2102                chunk_size = chunk.len(),
2103                total_response_bytes = ctx.response_bytes,
2104                end_of_stream = end_of_stream,
2105                "Processing response body chunk"
2106            );
2107
2108            // Process SSE chunks for streaming token counting
2109            if let Some(ref mut counter) = ctx.inference_streaming_counter {
2110                let result = counter.process_chunk(chunk);
2111
2112                if result.content.is_some() || result.is_done {
2113                    trace!(
2114                        correlation_id = %ctx.trace_id,
2115                        has_content = result.content.is_some(),
2116                        is_done = result.is_done,
2117                        chunks_processed = counter.chunks_processed(),
2118                        accumulated_content_len = counter.content().len(),
2119                        "Processed SSE chunk for token counting"
2120                    );
2121                }
2122            }
2123
2124            // Response body inspection (buffered mode only)
2125            // Note: Streaming mode for response bodies is not currently supported
2126            // due to Pingora's synchronous response_body_filter design
2127            if ctx.response_body_inspection_enabled
2128                && !ctx.response_body_inspection_agents.is_empty()
2129            {
2130                let config = ctx
2131                    .config
2132                    .get_or_insert_with(|| self.config_manager.current());
2133                let max_inspection_bytes = config
2134                    .waf
2135                    .as_ref()
2136                    .map(|w| w.body_inspection.max_inspection_bytes as u64)
2137                    .unwrap_or(1024 * 1024);
2138
2139                if ctx.response_body_bytes_inspected < max_inspection_bytes {
2140                    let bytes_to_inspect = std::cmp::min(
2141                        chunk.len() as u64,
2142                        max_inspection_bytes - ctx.response_body_bytes_inspected,
2143                    ) as usize;
2144
2145                    // Buffer for later processing (during logging phase)
2146                    // Response body inspection happens asynchronously and results
2147                    // are logged rather than blocking the response
2148                    ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
2149                    ctx.response_body_chunk_index += 1;
2150
2151                    trace!(
2152                        correlation_id = %ctx.trace_id,
2153                        bytes_inspected = ctx.response_body_bytes_inspected,
2154                        max_inspection_bytes = max_inspection_bytes,
2155                        chunk_index = ctx.response_body_chunk_index,
2156                        "Tracking response body for inspection"
2157                    );
2158                }
2159            }
2160        }
2161
2162        if end_of_stream {
2163            trace!(
2164                correlation_id = %ctx.trace_id,
2165                total_response_bytes = ctx.response_bytes,
2166                response_bytes_inspected = ctx.response_body_bytes_inspected,
2167                "Response body complete"
2168            );
2169        }
2170
2171        // Return None to indicate no delay needed
2172        Ok(None)
2173    }
2174
2175    /// Called when a connection to upstream is established or reused.
2176    /// Logs connection reuse statistics for observability.
2177    async fn connected_to_upstream(
2178        &self,
2179        _session: &mut Session,
2180        reused: bool,
2181        peer: &HttpPeer,
2182        #[cfg(unix)] _fd: RawFd,
2183        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
2184        digest: Option<&Digest>,
2185        ctx: &mut Self::CTX,
2186    ) -> Result<(), Box<Error>> {
2187        // Track connection reuse for metrics
2188        ctx.connection_reused = reused;
2189
2190        // Log connection establishment/reuse
2191        if reused {
2192            trace!(
2193                correlation_id = %ctx.trace_id,
2194                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2195                peer_address = %peer.address(),
2196                "Reusing existing upstream connection"
2197            );
2198        } else {
2199            debug!(
2200                correlation_id = %ctx.trace_id,
2201                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2202                peer_address = %peer.address(),
2203                ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
2204                "Established new upstream connection"
2205            );
2206        }
2207
2208        Ok(())
2209    }
2210
2211    // =========================================================================
2212    // HTTP Caching - Pingora Cache Integration
2213    // =========================================================================
2214
2215    /// Decide if the request should use caching.
2216    ///
2217    /// This method is called early in the request lifecycle to determine if
2218    /// the response should be served from cache or if the response should
2219    /// be cached.
2220    fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
2221        // Check if route has caching enabled
2222        let route_id = match ctx.route_id.as_deref() {
2223            Some(id) => id,
2224            None => {
2225                trace!(
2226                    correlation_id = %ctx.trace_id,
2227                    "Cache filter: no route ID, skipping cache"
2228                );
2229                return Ok(());
2230            }
2231        };
2232
2233        // Check if caching is enabled for this route
2234        if !self.cache_manager.is_enabled(route_id) {
2235            trace!(
2236                correlation_id = %ctx.trace_id,
2237                route_id = %route_id,
2238                "Cache disabled for route"
2239            );
2240            return Ok(());
2241        }
2242
2243        // Check if method is cacheable (typically GET/HEAD)
2244        if !self
2245            .cache_manager
2246            .is_method_cacheable(route_id, &ctx.method)
2247        {
2248            trace!(
2249                correlation_id = %ctx.trace_id,
2250                route_id = %route_id,
2251                method = %ctx.method,
2252                "Method not cacheable"
2253            );
2254            return Ok(());
2255        }
2256
2257        // Enable caching for this request using Pingora's cache infrastructure
2258        debug!(
2259            correlation_id = %ctx.trace_id,
2260            route_id = %route_id,
2261            method = %ctx.method,
2262            path = %ctx.path,
2263            "Enabling HTTP caching for request"
2264        );
2265
2266        // Get static references to cache infrastructure
2267        let storage = get_cache_storage();
2268        let eviction = get_cache_eviction();
2269        let cache_lock = get_cache_lock();
2270
2271        // Enable the cache with storage, eviction, and lock
2272        session.cache.enable(
2273            storage,
2274            Some(eviction),
2275            None, // predictor - optional
2276            Some(cache_lock),
2277            None, // option overrides
2278        );
2279
2280        // Mark request as cache-eligible in context
2281        ctx.cache_eligible = true;
2282
2283        trace!(
2284            correlation_id = %ctx.trace_id,
2285            route_id = %route_id,
2286            cache_enabled = session.cache.enabled(),
2287            "Cache enabled for request"
2288        );
2289
2290        Ok(())
2291    }
2292
2293    /// Generate the cache key for this request.
2294    ///
2295    /// The cache key uniquely identifies the cached response. It typically
2296    /// includes the method, host, path, and potentially query parameters.
2297    fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
2298        let req_header = session.req_header();
2299        let method = req_header.method.as_str();
2300        let path = req_header.uri.path();
2301        let host = ctx.host.as_deref().unwrap_or("unknown");
2302        let query = req_header.uri.query();
2303
2304        // Generate cache key using our cache manager
2305        let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2306
2307        trace!(
2308            correlation_id = %ctx.trace_id,
2309            cache_key = %key_string,
2310            "Generated cache key"
2311        );
2312
2313        // Use Pingora's default cache key generator which handles
2314        // proper hashing and internal format
2315        Ok(CacheKey::default(req_header))
2316    }
2317
2318    /// Called when a cache miss occurs.
2319    ///
2320    /// This is called when the cache lookup found no matching entry.
2321    /// We can use this to log and track cache misses.
2322    fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
2323        // Let Pingora handle the cache miss
2324        session.cache.cache_miss();
2325
2326        // Track statistics
2327        if let Some(route_id) = ctx.route_id.as_deref() {
2328            self.cache_manager.stats().record_miss();
2329
2330            trace!(
2331                correlation_id = %ctx.trace_id,
2332                route_id = %route_id,
2333                path = %ctx.path,
2334                "Cache miss"
2335            );
2336        }
2337    }
2338
2339    /// Called after a successful cache lookup.
2340    ///
2341    /// This filter allows inspecting the cached response before serving it.
2342    /// Returns `None` to serve the cached response, or a `ForcedInvalidationKind`
2343    /// to invalidate and refetch.
2344    async fn cache_hit_filter(
2345        &self,
2346        session: &mut Session,
2347        meta: &CacheMeta,
2348        _hit_handler: &mut HitHandler,
2349        is_fresh: bool,
2350        ctx: &mut Self::CTX,
2351    ) -> Result<Option<ForcedInvalidationKind>>
2352    where
2353        Self::CTX: Send + Sync,
2354    {
2355        // Check if this cache entry should be invalidated due to a purge request
2356        let req_header = session.req_header();
2357        let method = req_header.method.as_str();
2358        let path = req_header.uri.path();
2359        let host = req_header.uri.host().unwrap_or("localhost");
2360        let query = req_header.uri.query();
2361
2362        // Generate the cache key for this request
2363        let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2364
2365        // Check if this key should be invalidated
2366        if self.cache_manager.should_invalidate(&cache_key) {
2367            info!(
2368                correlation_id = %ctx.trace_id,
2369                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2370                cache_key = %cache_key,
2371                "Cache entry invalidated by purge request"
2372            );
2373            // Force expiration so the entry is refetched from upstream
2374            return Ok(Some(ForcedInvalidationKind::ForceExpired));
2375        }
2376
2377        // Track cache hit statistics
2378        if is_fresh {
2379            self.cache_manager.stats().record_hit();
2380
2381            debug!(
2382                correlation_id = %ctx.trace_id,
2383                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2384                is_fresh = is_fresh,
2385                "Cache hit (fresh)"
2386            );
2387        } else {
2388            trace!(
2389                correlation_id = %ctx.trace_id,
2390                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2391                is_fresh = is_fresh,
2392                "Cache hit (stale)"
2393            );
2394        }
2395
2396        // Serve the cached response without invalidation
2397        Ok(None)
2398    }
2399
2400    /// Decide if the response should be cached.
2401    ///
2402    /// Called after receiving the response from upstream to determine
2403    /// if it should be stored in the cache.
2404    fn response_cache_filter(
2405        &self,
2406        _session: &Session,
2407        resp: &ResponseHeader,
2408        ctx: &mut Self::CTX,
2409    ) -> Result<RespCacheable> {
2410        let route_id = match ctx.route_id.as_deref() {
2411            Some(id) => id,
2412            None => {
2413                return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2414                    "no_route",
2415                )));
2416            }
2417        };
2418
2419        // Check if caching is enabled for this route
2420        if !self.cache_manager.is_enabled(route_id) {
2421            return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2422                "disabled",
2423            )));
2424        }
2425
2426        let status = resp.status.as_u16();
2427
2428        // Check if status code is cacheable
2429        if !self.cache_manager.is_status_cacheable(route_id, status) {
2430            trace!(
2431                correlation_id = %ctx.trace_id,
2432                route_id = %route_id,
2433                status = status,
2434                "Status code not cacheable"
2435            );
2436            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2437        }
2438
2439        // Check Cache-Control header for no-store, no-cache, private
2440        if let Some(cache_control) = resp.headers.get("cache-control") {
2441            if let Ok(cc_str) = cache_control.to_str() {
2442                if crate::cache::CacheManager::is_no_cache(cc_str) {
2443                    trace!(
2444                        correlation_id = %ctx.trace_id,
2445                        route_id = %route_id,
2446                        cache_control = %cc_str,
2447                        "Response has no-cache directive"
2448                    );
2449                    return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2450                }
2451            }
2452        }
2453
2454        // Calculate TTL from Cache-Control or use default
2455        let cache_control = resp
2456            .headers
2457            .get("cache-control")
2458            .and_then(|v| v.to_str().ok());
2459        let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
2460
2461        if ttl.is_zero() {
2462            trace!(
2463                correlation_id = %ctx.trace_id,
2464                route_id = %route_id,
2465                "TTL is zero, not caching"
2466            );
2467            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2468        }
2469
2470        // Get route cache config for stale settings
2471        let config = self
2472            .cache_manager
2473            .get_route_config(route_id)
2474            .unwrap_or_default();
2475
2476        // Create timestamps for cache metadata
2477        let now = std::time::SystemTime::now();
2478        let fresh_until = now + ttl;
2479
2480        // Clone the response header for storage
2481        let header = resp.clone();
2482
2483        // Create CacheMeta with proper timestamps and TTLs
2484        let cache_meta = CacheMeta::new(
2485            fresh_until,
2486            now,
2487            config.stale_while_revalidate_secs as u32,
2488            config.stale_if_error_secs as u32,
2489            header,
2490        );
2491
2492        // Track the cache store
2493        self.cache_manager.stats().record_store();
2494
2495        debug!(
2496            correlation_id = %ctx.trace_id,
2497            route_id = %route_id,
2498            status = status,
2499            ttl_secs = ttl.as_secs(),
2500            stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2501            stale_if_error_secs = config.stale_if_error_secs,
2502            "Caching response"
2503        );
2504
2505        Ok(RespCacheable::Cacheable(cache_meta))
2506    }
2507
2508    /// Decide whether to serve stale content on error or during revalidation.
2509    ///
2510    /// This implements stale-while-revalidate and stale-if-error semantics.
2511    fn should_serve_stale(
2512        &self,
2513        _session: &mut Session,
2514        ctx: &mut Self::CTX,
2515        error: Option<&Error>,
2516    ) -> bool {
2517        let route_id = match ctx.route_id.as_deref() {
2518            Some(id) => id,
2519            None => return false,
2520        };
2521
2522        // Get route cache config for stale settings
2523        let config = match self.cache_manager.get_route_config(route_id) {
2524            Some(c) => c,
2525            None => return false,
2526        };
2527
2528        // If there's an upstream error, use stale-if-error
2529        if let Some(e) = error {
2530            // Only serve stale for upstream errors
2531            if e.esource() == &pingora::ErrorSource::Upstream {
2532                debug!(
2533                    correlation_id = %ctx.trace_id,
2534                    route_id = %route_id,
2535                    error = %e,
2536                    stale_if_error_secs = config.stale_if_error_secs,
2537                    "Considering stale-if-error"
2538                );
2539                return config.stale_if_error_secs > 0;
2540            }
2541        }
2542
2543        // During stale-while-revalidate (error is None)
2544        if error.is_none() && config.stale_while_revalidate_secs > 0 {
2545            trace!(
2546                correlation_id = %ctx.trace_id,
2547                route_id = %route_id,
2548                stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2549                "Allowing stale-while-revalidate"
2550            );
2551            return true;
2552        }
2553
2554        false
2555    }
2556
2557    /// Handle Range header for byte-range requests (streaming support).
2558    ///
2559    /// This method is called when a Range header is present in the request.
2560    /// It allows proper handling of:
2561    /// - Video streaming (HTML5 video seeking)
2562    /// - Large file downloads with resume support
2563    /// - Partial content delivery
2564    ///
2565    /// Uses Pingora's built-in range handling with route-specific logging.
2566    fn range_header_filter(
2567        &self,
2568        session: &mut Session,
2569        response: &mut ResponseHeader,
2570        ctx: &mut Self::CTX,
2571    ) -> pingora_proxy::RangeType
2572    where
2573        Self::CTX: Send + Sync,
2574    {
2575        // Check if route supports range requests
2576        let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
2577            // Static file routes and media routes should support range requests
2578            matches!(
2579                config.service_type,
2580                sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
2581            )
2582        });
2583
2584        if !supports_range {
2585            trace!(
2586                correlation_id = %ctx.trace_id,
2587                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2588                "Range request not supported for this route type"
2589            );
2590            return pingora_proxy::RangeType::None;
2591        }
2592
2593        // Use Pingora's built-in range header parsing and handling
2594        let range_type = pingora_proxy::range_header_filter(session.req_header(), response);
2595
2596        match &range_type {
2597            pingora_proxy::RangeType::None => {
2598                trace!(
2599                    correlation_id = %ctx.trace_id,
2600                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2601                    "No range request or not applicable"
2602                );
2603            }
2604            pingora_proxy::RangeType::Single(range) => {
2605                trace!(
2606                    correlation_id = %ctx.trace_id,
2607                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2608                    range_start = range.start,
2609                    range_end = range.end,
2610                    "Processing single-range request"
2611                );
2612            }
2613            pingora_proxy::RangeType::Multi(multi) => {
2614                trace!(
2615                    correlation_id = %ctx.trace_id,
2616                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2617                    range_count = multi.ranges.len(),
2618                    "Processing multi-range request"
2619                );
2620            }
2621            pingora_proxy::RangeType::Invalid => {
2622                debug!(
2623                    correlation_id = %ctx.trace_id,
2624                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2625                    "Invalid range header"
2626                );
2627            }
2628        }
2629
2630        range_type
2631    }
2632
2633    /// Handle fatal proxy errors by generating custom error pages.
2634    /// Called when the proxy itself fails to process the request.
2635    async fn fail_to_proxy(
2636        &self,
2637        session: &mut Session,
2638        e: &Error,
2639        ctx: &mut Self::CTX,
2640    ) -> pingora_proxy::FailToProxy
2641    where
2642        Self::CTX: Send + Sync,
2643    {
2644        let error_code = match e.etype() {
2645            // Connection errors
2646            ErrorType::ConnectRefused => 503,
2647            ErrorType::ConnectTimedout => 504,
2648            ErrorType::ConnectNoRoute => 502,
2649
2650            // Timeout errors
2651            ErrorType::ReadTimedout => 504,
2652            ErrorType::WriteTimedout => 504,
2653
2654            // TLS errors
2655            ErrorType::TLSHandshakeFailure => 502,
2656            ErrorType::InvalidCert => 502,
2657
2658            // Protocol errors
2659            ErrorType::InvalidHTTPHeader => 400,
2660            ErrorType::H2Error => 502,
2661
2662            // Resource errors
2663            ErrorType::ConnectProxyFailure => 502,
2664            ErrorType::ConnectionClosed => 502,
2665
2666            // Explicit HTTP status (e.g., from agent fail-closed blocking)
2667            ErrorType::HTTPStatus(status) => *status,
2668
2669            // Internal errors - return 502 for upstream issues (more accurate than 500)
2670            ErrorType::InternalError => {
2671                // Check if this is an upstream-related error
2672                let error_str = e.to_string();
2673                if error_str.contains("upstream")
2674                    || error_str.contains("DNS")
2675                    || error_str.contains("resolve")
2676                {
2677                    502
2678                } else {
2679                    500
2680                }
2681            }
2682
2683            // Default to 502 for unknown errors
2684            _ => 502,
2685        };
2686
2687        error!(
2688            correlation_id = %ctx.trace_id,
2689            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2690            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2691            error_type = ?e.etype(),
2692            error = %e,
2693            error_code = error_code,
2694            "Proxy error occurred"
2695        );
2696
2697        // Record the error in metrics
2698        self.metrics
2699            .record_blocked_request(&format!("proxy_error_{}", error_code));
2700
2701        // Write error response to ensure client receives a proper HTTP response
2702        // This is necessary because some errors occur before the upstream connection
2703        // is established, and Pingora may not send a response automatically
2704        let error_message = match error_code {
2705            400 => "Bad Request",
2706            502 => "Bad Gateway",
2707            503 => "Service Unavailable",
2708            504 => "Gateway Timeout",
2709            _ => "Internal Server Error",
2710        };
2711
2712        // Build a minimal error response body
2713        let body = format!(
2714            r#"{{"error":"{} {}","trace_id":"{}"}}"#,
2715            error_code, error_message, ctx.trace_id
2716        );
2717
2718        // Write the response header
2719        let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
2720        header
2721            .insert_header("Content-Type", "application/json")
2722            .ok();
2723        header
2724            .insert_header("Content-Length", body.len().to_string())
2725            .ok();
2726        header
2727            .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
2728            .ok();
2729        header.insert_header("Connection", "close").ok();
2730
2731        // Write headers and body
2732        if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
2733            warn!(
2734                correlation_id = %ctx.trace_id,
2735                error = %write_err,
2736                "Failed to write error response header"
2737            );
2738        } else {
2739            // Write the body
2740            if let Err(write_err) = session
2741                .write_response_body(Some(bytes::Bytes::from(body)), true)
2742                .await
2743            {
2744                warn!(
2745                    correlation_id = %ctx.trace_id,
2746                    error = %write_err,
2747                    "Failed to write error response body"
2748                );
2749            }
2750        }
2751
2752        // Return the error response info
2753        // can_reuse_downstream: false since we already wrote and closed the response
2754        pingora_proxy::FailToProxy {
2755            error_code,
2756            can_reuse_downstream: false,
2757        }
2758    }
2759
2760    /// Handle errors that occur during proxying after upstream connection is established.
2761    ///
2762    /// This method enables retry logic and circuit breaker integration.
2763    /// It's called when an error occurs during the request/response exchange
2764    /// with the upstream server.
2765    fn error_while_proxy(
2766        &self,
2767        peer: &HttpPeer,
2768        session: &mut Session,
2769        e: Box<Error>,
2770        ctx: &mut Self::CTX,
2771        client_reused: bool,
2772    ) -> Box<Error> {
2773        let error_type = e.etype().clone();
2774        let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
2775
2776        // Classify error for retry decisions
2777        let is_retryable = matches!(
2778            error_type,
2779            ErrorType::ConnectTimedout
2780                | ErrorType::ReadTimedout
2781                | ErrorType::WriteTimedout
2782                | ErrorType::ConnectionClosed
2783                | ErrorType::ConnectRefused
2784        );
2785
2786        // Log the error with context
2787        warn!(
2788            correlation_id = %ctx.trace_id,
2789            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2790            upstream = %upstream_id,
2791            peer_address = %peer.address(),
2792            error_type = ?error_type,
2793            error = %e,
2794            client_reused = client_reused,
2795            is_retryable = is_retryable,
2796            "Error during proxy operation"
2797        );
2798
2799        // Record failure with circuit breaker via upstream pool
2800        // This is done asynchronously since we can't await in a sync fn
2801        let peer_address = peer.address().to_string();
2802        let upstream_pools = self.upstream_pools.clone();
2803        let upstream_id_owned = upstream_id.to_string();
2804        tokio::spawn(async move {
2805            if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
2806                pool.report_result(&peer_address, false).await;
2807            }
2808        });
2809
2810        // Metrics tracking
2811        self.metrics
2812            .record_blocked_request(&format!("proxy_error_{:?}", error_type));
2813
2814        // Create enhanced error with retry information
2815        let mut enhanced_error = e.more_context(format!(
2816            "Upstream: {}, Peer: {}, Attempts: {}",
2817            upstream_id,
2818            peer.address(),
2819            ctx.upstream_attempts
2820        ));
2821
2822        // Determine if retry should be attempted:
2823        // - Only retry if error is retryable type
2824        // - Only retry reused connections if buffer isn't truncated
2825        // - Track retry metrics
2826        if is_retryable {
2827            let can_retry = if client_reused {
2828                // For reused connections, check if retry buffer is intact
2829                !session.as_ref().retry_buffer_truncated()
2830            } else {
2831                // Fresh connections can always retry
2832                true
2833            };
2834
2835            enhanced_error.retry.decide_reuse(can_retry);
2836
2837            if can_retry {
2838                debug!(
2839                    correlation_id = %ctx.trace_id,
2840                    upstream = %upstream_id,
2841                    error_type = ?error_type,
2842                    "Error is retryable, will attempt retry"
2843                );
2844            }
2845        } else {
2846            // Non-retryable error - don't retry
2847            enhanced_error.retry.decide_reuse(false);
2848        }
2849
2850        enhanced_error
2851    }
2852
2853    async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
2854        // Decrement active requests
2855        self.reload_coordinator.dec_requests();
2856
2857        // === Fire pending shadow request (if body buffering was enabled) ===
2858        if !ctx.shadow_sent {
2859            if let Some(shadow_pending) = ctx.shadow_pending.take() {
2860                let body = if shadow_pending.include_body && !ctx.body_buffer.is_empty() {
2861                    // Clone the buffered body for the shadow request
2862                    Some(ctx.body_buffer.clone())
2863                } else {
2864                    None
2865                };
2866
2867                trace!(
2868                    correlation_id = %ctx.trace_id,
2869                    body_size = body.as_ref().map(|b| b.len()).unwrap_or(0),
2870                    "Firing deferred shadow request with buffered body"
2871                );
2872
2873                shadow_pending.manager.shadow_request(
2874                    shadow_pending.headers,
2875                    body,
2876                    shadow_pending.request_ctx,
2877                );
2878                ctx.shadow_sent = true;
2879            }
2880        }
2881
2882        let duration = ctx.elapsed();
2883
2884        // Get response status
2885        let status = session
2886            .response_written()
2887            .map(|r| r.status.as_u16())
2888            .unwrap_or(0);
2889
2890        // Report result to load balancer for adaptive LB feedback
2891        // This enables latency-aware weight adjustment
2892        if let (Some(ref peer_addr), Some(ref upstream_id)) =
2893            (&ctx.selected_upstream_address, &ctx.upstream)
2894        {
2895            // Success = status code < 500 (client errors are not upstream failures)
2896            let success = status > 0 && status < 500;
2897
2898            if let Some(pool) = self.upstream_pools.get(upstream_id).await {
2899                pool.report_result_with_latency(peer_addr, success, Some(duration))
2900                    .await;
2901                trace!(
2902                    correlation_id = %ctx.trace_id,
2903                    upstream = %upstream_id,
2904                    peer_address = %peer_addr,
2905                    success = success,
2906                    duration_ms = duration.as_millis(),
2907                    status = status,
2908                    "Reported result to adaptive load balancer"
2909                );
2910            }
2911
2912            // Track warmth for inference routes (cold model detection)
2913            if ctx.inference_rate_limit_enabled && success {
2914                let cold_detected = self.warmth_tracker.record_request(peer_addr, duration);
2915                if cold_detected {
2916                    debug!(
2917                        correlation_id = %ctx.trace_id,
2918                        upstream = %upstream_id,
2919                        peer_address = %peer_addr,
2920                        duration_ms = duration.as_millis(),
2921                        "Cold model detected on inference upstream"
2922                    );
2923                }
2924            }
2925        }
2926
2927        // Record actual token usage for inference rate limiting
2928        // This adjusts the token bucket based on actual vs estimated tokens
2929        if ctx.inference_rate_limit_enabled {
2930            if let (Some(route_id), Some(ref rate_limit_key)) =
2931                (ctx.route_id.as_deref(), &ctx.inference_rate_limit_key)
2932            {
2933                // Try to extract actual tokens from response headers
2934                let response_headers = session
2935                    .response_written()
2936                    .map(|r| &r.headers)
2937                    .cloned()
2938                    .unwrap_or_default();
2939
2940                // For streaming responses, finalize the streaming token counter
2941                let streaming_result = if ctx.inference_streaming_response {
2942                    ctx.inference_streaming_counter
2943                        .as_ref()
2944                        .map(|counter| counter.finalize())
2945                } else {
2946                    None
2947                };
2948
2949                // Log streaming token count info
2950                if let Some(ref result) = streaming_result {
2951                    debug!(
2952                        correlation_id = %ctx.trace_id,
2953                        output_tokens = result.output_tokens,
2954                        input_tokens = ?result.input_tokens,
2955                        source = ?result.source,
2956                        content_length = result.content_length,
2957                        "Finalized streaming token count"
2958                    );
2959                }
2960
2961                // PII detection guardrail (for streaming inference responses)
2962                if ctx.inference_streaming_response {
2963                    if let Some(ref route_config) = ctx.route_config {
2964                        if let Some(ref inference) = route_config.inference {
2965                            if let Some(ref guardrails) = inference.guardrails {
2966                                if let Some(ref pii_config) = guardrails.pii_detection {
2967                                    if pii_config.enabled {
2968                                        // Get accumulated content from streaming counter
2969                                        if let Some(ref counter) = ctx.inference_streaming_counter {
2970                                            let response_content = counter.content();
2971                                            if !response_content.is_empty() {
2972                                                let pii_result = self
2973                                                    .guardrail_processor
2974                                                    .check_pii(
2975                                                        pii_config,
2976                                                        response_content,
2977                                                        ctx.route_id.as_deref(),
2978                                                        &ctx.trace_id,
2979                                                    )
2980                                                    .await;
2981
2982                                                match pii_result {
2983                                                    crate::inference::PiiCheckResult::Detected {
2984                                                        detections,
2985                                                        redacted_content: _,
2986                                                    } => {
2987                                                        warn!(
2988                                                            correlation_id = %ctx.trace_id,
2989                                                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2990                                                            detection_count = detections.len(),
2991                                                            "PII detected in inference response"
2992                                                        );
2993
2994                                                        // Store detection categories for logging
2995                                                        ctx.pii_detection_categories = detections
2996                                                            .iter()
2997                                                            .map(|d| d.category.clone())
2998                                                            .collect();
2999
3000                                                        // Record metrics for each category
3001                                                        for detection in &detections {
3002                                                            self.metrics.record_pii_detected(
3003                                                                ctx.route_id.as_deref().unwrap_or("unknown"),
3004                                                                &detection.category,
3005                                                            );
3006                                                        }
3007                                                    }
3008                                                    crate::inference::PiiCheckResult::Clean => {
3009                                                        trace!(
3010                                                            correlation_id = %ctx.trace_id,
3011                                                            "No PII detected in response"
3012                                                        );
3013                                                    }
3014                                                    crate::inference::PiiCheckResult::Error { message } => {
3015                                                        debug!(
3016                                                            correlation_id = %ctx.trace_id,
3017                                                            error = %message,
3018                                                            "PII detection check failed"
3019                                                        );
3020                                                    }
3021                                                }
3022                                            }
3023                                        }
3024                                    }
3025                                }
3026                            }
3027                        }
3028                    }
3029                }
3030
3031                // Response body would require buffering, which is expensive
3032                // For non-streaming, most LLM APIs provide token counts in headers
3033                // For streaming, we use the accumulated SSE content
3034                let empty_body: &[u8] = &[];
3035
3036                if let Some(actual_estimate) = self.inference_rate_limit_manager.record_actual(
3037                    route_id,
3038                    rate_limit_key,
3039                    &response_headers,
3040                    empty_body,
3041                    ctx.inference_estimated_tokens,
3042                ) {
3043                    // Use streaming result if available and header extraction failed
3044                    let (actual_tokens, source_info) = if let Some(ref streaming) = streaming_result {
3045                        // Prefer API-provided counts from streaming, otherwise use tiktoken count
3046                        if streaming.total_tokens.is_some() {
3047                            (streaming.total_tokens.unwrap(), "streaming_api")
3048                        } else if actual_estimate.source == crate::inference::TokenSource::Estimated {
3049                            // Header extraction failed, use streaming tiktoken count
3050                            // Estimate total by adding input estimate + output from streaming
3051                            let total = ctx.inference_input_tokens + streaming.output_tokens;
3052                            (total, "streaming_tiktoken")
3053                        } else {
3054                            (actual_estimate.tokens, "headers")
3055                        }
3056                    } else {
3057                        (actual_estimate.tokens, "headers")
3058                    };
3059
3060                    ctx.inference_actual_tokens = Some(actual_tokens);
3061
3062                    debug!(
3063                        correlation_id = %ctx.trace_id,
3064                        route_id = route_id,
3065                        estimated_tokens = ctx.inference_estimated_tokens,
3066                        actual_tokens = actual_tokens,
3067                        source = source_info,
3068                        streaming_response = ctx.inference_streaming_response,
3069                        model = ?ctx.inference_model,
3070                        "Recorded actual inference tokens"
3071                    );
3072
3073                    // Record budget usage with actual tokens (if budget tracking enabled)
3074                    if ctx.inference_budget_enabled {
3075                        let alerts = self.inference_rate_limit_manager.record_budget(
3076                            route_id,
3077                            rate_limit_key,
3078                            actual_tokens,
3079                        );
3080
3081                        // Log any budget alerts that fired
3082                        for alert in alerts.iter() {
3083                            warn!(
3084                                correlation_id = %ctx.trace_id,
3085                                route_id = route_id,
3086                                tenant = %alert.tenant,
3087                                threshold_pct = alert.threshold * 100.0,
3088                                tokens_used = alert.tokens_used,
3089                                tokens_limit = alert.tokens_limit,
3090                                "Token budget alert threshold crossed"
3091                            );
3092                        }
3093
3094                        // Update context with remaining budget
3095                        if let Some(status) = self.inference_rate_limit_manager.budget_status(
3096                            route_id,
3097                            rate_limit_key,
3098                        ) {
3099                            ctx.inference_budget_remaining = Some(status.tokens_remaining as i64);
3100                        }
3101                    }
3102
3103                    // Calculate cost if cost attribution is enabled
3104                    if ctx.inference_cost_enabled {
3105                        if let Some(model) = ctx.inference_model.as_deref() {
3106                            // Use streaming result for more accurate input/output split if available
3107                            let (input_tokens, output_tokens) = if let Some(ref streaming) = streaming_result {
3108                                // Streaming gives us accurate output tokens
3109                                let input = streaming.input_tokens.unwrap_or(ctx.inference_input_tokens);
3110                                let output = streaming.output_tokens;
3111                                (input, output)
3112                            } else {
3113                                // Fallback: estimate output from total - input
3114                                let input = ctx.inference_input_tokens;
3115                                let output = actual_tokens.saturating_sub(input);
3116                                (input, output)
3117                            };
3118                            ctx.inference_output_tokens = output_tokens;
3119
3120                            if let Some(cost_result) = self.inference_rate_limit_manager.calculate_cost(
3121                                route_id,
3122                                model,
3123                                input_tokens,
3124                                output_tokens,
3125                            ) {
3126                                ctx.inference_request_cost = Some(cost_result.total_cost);
3127
3128                                trace!(
3129                                    correlation_id = %ctx.trace_id,
3130                                    route_id = route_id,
3131                                    model = model,
3132                                    input_tokens = input_tokens,
3133                                    output_tokens = output_tokens,
3134                                    total_cost = cost_result.total_cost,
3135                                    currency = %cost_result.currency,
3136                                    "Calculated inference request cost"
3137                                );
3138                            }
3139                        }
3140                    }
3141                }
3142            }
3143        }
3144
3145        // Write to access log file if configured
3146        if self.log_manager.access_log_enabled() {
3147            let access_entry = AccessLogEntry {
3148                timestamp: chrono::Utc::now().to_rfc3339(),
3149                trace_id: ctx.trace_id.clone(),
3150                method: ctx.method.clone(),
3151                path: ctx.path.clone(),
3152                query: ctx.query.clone(),
3153                protocol: "HTTP/1.1".to_string(),
3154                status,
3155                body_bytes: ctx.response_bytes,
3156                duration_ms: duration.as_millis() as u64,
3157                client_ip: ctx.client_ip.clone(),
3158                user_agent: ctx.user_agent.clone(),
3159                referer: ctx.referer.clone(),
3160                host: ctx.host.clone(),
3161                route_id: ctx.route_id.clone(),
3162                upstream: ctx.upstream.clone(),
3163                upstream_attempts: ctx.upstream_attempts,
3164                instance_id: self.app_state.instance_id.clone(),
3165                namespace: ctx.namespace.clone(),
3166                service: ctx.service.clone(),
3167                // New fields
3168                body_bytes_sent: ctx.response_bytes,
3169                upstream_addr: ctx.selected_upstream_address.clone(),
3170                connection_reused: ctx.connection_reused,
3171                rate_limit_hit: status == 429,
3172                geo_country: ctx.geo_country_code.clone(),
3173            };
3174            self.log_manager.log_access(&access_entry);
3175        }
3176
3177        // Log to tracing at debug level (avoid allocations if debug disabled)
3178        if tracing::enabled!(tracing::Level::DEBUG) {
3179            debug!(
3180                trace_id = %ctx.trace_id,
3181                method = %ctx.method,
3182                path = %ctx.path,
3183                route_id = ?ctx.route_id,
3184                upstream = ?ctx.upstream,
3185                status = status,
3186                duration_ms = duration.as_millis() as u64,
3187                upstream_attempts = ctx.upstream_attempts,
3188                error = ?_error.map(|e| e.to_string()),
3189                "Request completed"
3190            );
3191        }
3192
3193        // Log WebSocket upgrades at info level
3194        if ctx.is_websocket_upgrade && status == 101 {
3195            info!(
3196                trace_id = %ctx.trace_id,
3197                route_id = ?ctx.route_id,
3198                upstream = ?ctx.upstream,
3199                client_ip = %ctx.client_ip,
3200                "WebSocket connection established"
3201            );
3202        }
3203
3204        // End OpenTelemetry span
3205        if let Some(span) = ctx.otel_span.take() {
3206            span.end();
3207        }
3208    }
3209}
3210
3211// =============================================================================
3212// Helper methods for body streaming (not part of ProxyHttp trait)
3213// =============================================================================
3214
3215impl SentinelProxy {
3216    /// Process a single body chunk in streaming mode.
3217    async fn process_body_chunk_streaming(
3218        &self,
3219        body: &mut Option<Bytes>,
3220        end_of_stream: bool,
3221        ctx: &mut RequestContext,
3222    ) -> Result<(), Box<Error>> {
3223        // Clone the chunk data to avoid borrowing issues when mutating body later
3224        let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
3225        let chunk_index = ctx.request_body_chunk_index;
3226        ctx.request_body_chunk_index += 1;
3227        ctx.body_bytes_inspected += chunk_data.len() as u64;
3228
3229        debug!(
3230            correlation_id = %ctx.trace_id,
3231            chunk_index = chunk_index,
3232            chunk_size = chunk_data.len(),
3233            end_of_stream = end_of_stream,
3234            "Streaming body chunk to agents"
3235        );
3236
3237        // Create agent call context
3238        let agent_ctx = crate::agents::AgentCallContext {
3239            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3240            metadata: sentinel_agent_protocol::RequestMetadata {
3241                correlation_id: ctx.trace_id.clone(),
3242                request_id: ctx.trace_id.clone(),
3243                client_ip: ctx.client_ip.clone(),
3244                client_port: 0,
3245                server_name: ctx.host.clone(),
3246                protocol: "HTTP/1.1".to_string(),
3247                tls_version: None,
3248                tls_cipher: None,
3249                route_id: ctx.route_id.clone(),
3250                upstream_id: ctx.upstream.clone(),
3251                timestamp: chrono::Utc::now().to_rfc3339(),
3252                traceparent: ctx.traceparent(),
3253            },
3254            route_id: ctx.route_id.clone(),
3255            upstream_id: ctx.upstream.clone(),
3256            request_body: None, // Not used in streaming mode
3257            response_body: None,
3258        };
3259
3260        let agent_ids = ctx.body_inspection_agents.clone();
3261        let total_size = None; // Unknown in streaming mode
3262
3263        match self
3264            .agent_manager
3265            .process_request_body_streaming(
3266                &agent_ctx,
3267                &chunk_data,
3268                end_of_stream,
3269                chunk_index,
3270                ctx.body_bytes_inspected as usize,
3271                total_size,
3272                &agent_ids,
3273            )
3274            .await
3275        {
3276            Ok(decision) => {
3277                // Track if agent needs more data
3278                ctx.agent_needs_more = decision.needs_more;
3279
3280                // Apply body mutation if present
3281                if let Some(ref mutation) = decision.request_body_mutation {
3282                    if !mutation.is_pass_through() {
3283                        if mutation.is_drop() {
3284                            // Drop the chunk
3285                            *body = None;
3286                            trace!(
3287                                correlation_id = %ctx.trace_id,
3288                                chunk_index = chunk_index,
3289                                "Agent dropped body chunk"
3290                            );
3291                        } else if let Some(ref new_data) = mutation.data {
3292                            // Replace chunk with mutated content
3293                            *body = Some(Bytes::from(new_data.clone()));
3294                            trace!(
3295                                correlation_id = %ctx.trace_id,
3296                                chunk_index = chunk_index,
3297                                original_size = chunk_data.len(),
3298                                new_size = new_data.len(),
3299                                "Agent mutated body chunk"
3300                            );
3301                        }
3302                    }
3303                }
3304
3305                // Check decision (only final if needs_more is false)
3306                if !decision.needs_more && !decision.is_allow() {
3307                    warn!(
3308                        correlation_id = %ctx.trace_id,
3309                        action = ?decision.action,
3310                        "Agent blocked request body"
3311                    );
3312                    self.metrics.record_blocked_request("agent_body_inspection");
3313
3314                    let (status, message) = match &decision.action {
3315                        crate::agents::AgentAction::Block { status, body, .. } => (
3316                            *status,
3317                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
3318                        ),
3319                        _ => (403, "Forbidden".to_string()),
3320                    };
3321
3322                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3323                }
3324
3325                trace!(
3326                    correlation_id = %ctx.trace_id,
3327                    needs_more = decision.needs_more,
3328                    "Agent processed body chunk"
3329                );
3330            }
3331            Err(e) => {
3332                let fail_closed = ctx
3333                    .route_config
3334                    .as_ref()
3335                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3336                    .unwrap_or(false);
3337
3338                if fail_closed {
3339                    error!(
3340                        correlation_id = %ctx.trace_id,
3341                        error = %e,
3342                        "Agent streaming body inspection failed, blocking (fail-closed)"
3343                    );
3344                    return Err(Error::explain(
3345                        ErrorType::HTTPStatus(503),
3346                        "Service unavailable",
3347                    ));
3348                } else {
3349                    warn!(
3350                        correlation_id = %ctx.trace_id,
3351                        error = %e,
3352                        "Agent streaming body inspection failed, allowing (fail-open)"
3353                    );
3354                }
3355            }
3356        }
3357
3358        Ok(())
3359    }
3360
3361    /// Send buffered body to agents (buffer mode).
3362    async fn send_buffered_body_to_agents(
3363        &self,
3364        end_of_stream: bool,
3365        ctx: &mut RequestContext,
3366    ) -> Result<(), Box<Error>> {
3367        debug!(
3368            correlation_id = %ctx.trace_id,
3369            buffer_size = ctx.body_buffer.len(),
3370            end_of_stream = end_of_stream,
3371            agent_count = ctx.body_inspection_agents.len(),
3372            decompression_enabled = ctx.decompression_enabled,
3373            "Sending buffered body to agents for inspection"
3374        );
3375
3376        // Decompress body if enabled and we have a supported encoding
3377        let body_for_inspection = if ctx.decompression_enabled {
3378            if let Some(ref encoding) = ctx.body_content_encoding {
3379                let config = crate::decompression::DecompressionConfig {
3380                    max_ratio: ctx.max_decompression_ratio,
3381                    max_output_bytes: ctx.max_decompression_bytes,
3382                };
3383
3384                match crate::decompression::decompress_body(
3385                    &ctx.body_buffer,
3386                    encoding,
3387                    &config,
3388                ) {
3389                    Ok(result) => {
3390                        ctx.body_was_decompressed = true;
3391                        self.metrics
3392                            .record_decompression_success(encoding, result.ratio);
3393                        debug!(
3394                            correlation_id = %ctx.trace_id,
3395                            encoding = %encoding,
3396                            compressed_size = result.compressed_size,
3397                            decompressed_size = result.decompressed_size,
3398                            ratio = result.ratio,
3399                            "Body decompressed for agent inspection"
3400                        );
3401                        result.data
3402                    }
3403                    Err(e) => {
3404                        // Record failure metric
3405                        let failure_reason = match &e {
3406                            crate::decompression::DecompressionError::RatioExceeded { .. } => {
3407                                "ratio_exceeded"
3408                            }
3409                            crate::decompression::DecompressionError::SizeExceeded { .. } => {
3410                                "size_exceeded"
3411                            }
3412                            crate::decompression::DecompressionError::InvalidData { .. } => {
3413                                "invalid_data"
3414                            }
3415                            crate::decompression::DecompressionError::UnsupportedEncoding { .. } => {
3416                                "unsupported"
3417                            }
3418                            crate::decompression::DecompressionError::IoError(_) => "io_error",
3419                        };
3420                        self.metrics
3421                            .record_decompression_failure(encoding, failure_reason);
3422
3423                        // Decompression failed - decide based on failure mode
3424                        let fail_closed = ctx
3425                            .route_config
3426                            .as_ref()
3427                            .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3428                            .unwrap_or(false);
3429
3430                        if fail_closed {
3431                            error!(
3432                                correlation_id = %ctx.trace_id,
3433                                error = %e,
3434                                encoding = %encoding,
3435                                "Decompression failed, blocking (fail-closed)"
3436                            );
3437                            return Err(Error::explain(
3438                                ErrorType::HTTPStatus(400),
3439                                "Invalid compressed body",
3440                            ));
3441                        } else {
3442                            warn!(
3443                                correlation_id = %ctx.trace_id,
3444                                error = %e,
3445                                encoding = %encoding,
3446                                "Decompression failed, sending compressed body (fail-open)"
3447                            );
3448                            ctx.body_buffer.clone()
3449                        }
3450                    }
3451                }
3452            } else {
3453                ctx.body_buffer.clone()
3454            }
3455        } else {
3456            ctx.body_buffer.clone()
3457        };
3458
3459        let agent_ctx = crate::agents::AgentCallContext {
3460            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3461            metadata: sentinel_agent_protocol::RequestMetadata {
3462                correlation_id: ctx.trace_id.clone(),
3463                request_id: ctx.trace_id.clone(),
3464                client_ip: ctx.client_ip.clone(),
3465                client_port: 0,
3466                server_name: ctx.host.clone(),
3467                protocol: "HTTP/1.1".to_string(),
3468                tls_version: None,
3469                tls_cipher: None,
3470                route_id: ctx.route_id.clone(),
3471                upstream_id: ctx.upstream.clone(),
3472                timestamp: chrono::Utc::now().to_rfc3339(),
3473                traceparent: ctx.traceparent(),
3474            },
3475            route_id: ctx.route_id.clone(),
3476            upstream_id: ctx.upstream.clone(),
3477            request_body: Some(body_for_inspection.clone()),
3478            response_body: None,
3479        };
3480
3481        let agent_ids = ctx.body_inspection_agents.clone();
3482        match self
3483            .agent_manager
3484            .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
3485            .await
3486        {
3487            Ok(decision) => {
3488                if !decision.is_allow() {
3489                    warn!(
3490                        correlation_id = %ctx.trace_id,
3491                        action = ?decision.action,
3492                        "Agent blocked request body"
3493                    );
3494                    self.metrics.record_blocked_request("agent_body_inspection");
3495
3496                    let (status, message) = match &decision.action {
3497                        crate::agents::AgentAction::Block { status, body, .. } => (
3498                            *status,
3499                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
3500                        ),
3501                        _ => (403, "Forbidden".to_string()),
3502                    };
3503
3504                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3505                }
3506
3507                trace!(
3508                    correlation_id = %ctx.trace_id,
3509                    "Agent allowed request body"
3510                );
3511            }
3512            Err(e) => {
3513                let fail_closed = ctx
3514                    .route_config
3515                    .as_ref()
3516                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3517                    .unwrap_or(false);
3518
3519                if fail_closed {
3520                    error!(
3521                        correlation_id = %ctx.trace_id,
3522                        error = %e,
3523                        "Agent body inspection failed, blocking (fail-closed)"
3524                    );
3525                    return Err(Error::explain(
3526                        ErrorType::HTTPStatus(503),
3527                        "Service unavailable",
3528                    ));
3529                } else {
3530                    warn!(
3531                        correlation_id = %ctx.trace_id,
3532                        error = %e,
3533                        "Agent body inspection failed, allowing (fail-open)"
3534                    );
3535                }
3536            }
3537        }
3538
3539        Ok(())
3540    }
3541}