Skip to main content

sentinel_proxy/proxy/
http_trait.rs

1//! ProxyHttp trait implementation for SentinelProxy.
2//!
3//! This module contains the Pingora ProxyHttp trait implementation which defines
4//! the core request/response lifecycle handling.
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14    CacheKey, CacheMeta, ForcedFreshness, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::inference::{
23    extract_inference_content, is_sse_response, PromptInjectionResult, StreamingTokenCounter,
24};
25use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
26use crate::rate_limit::HeaderAccessor;
27use crate::routing::RequestInfo;
28
29use super::context::{FallbackReason, RequestContext};
30use super::fallback::FallbackEvaluator;
31use super::fallback_metrics::get_fallback_metrics;
32use super::model_routing;
33use super::model_routing_metrics::get_model_routing_metrics;
34use super::SentinelProxy;
35
36/// Helper type for rate limiting when we don't need header access
37struct NoHeaderAccessor;
38impl HeaderAccessor for NoHeaderAccessor {
39    fn get_header(&self, _name: &str) -> Option<String> {
40        None
41    }
42}
43
44#[async_trait]
45impl ProxyHttp for SentinelProxy {
46    type CTX = RequestContext;
47
48    fn new_ctx(&self) -> Self::CTX {
49        RequestContext::new()
50    }
51
52    fn fail_to_connect(
53        &self,
54        _session: &mut Session,
55        peer: &HttpPeer,
56        ctx: &mut Self::CTX,
57        e: Box<Error>,
58    ) -> Box<Error> {
59        error!(
60            correlation_id = %ctx.trace_id,
61            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
62            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
63            peer_address = %peer.address(),
64            error = %e,
65            "Failed to connect to upstream peer"
66        );
67        // Custom error pages are handled in response_filter
68        e
69    }
70
71    /// Early request filter - runs before upstream selection
72    /// Used to handle builtin routes that don't need an upstream connection
73    async fn early_request_filter(
74        &self,
75        session: &mut Session,
76        ctx: &mut Self::CTX,
77    ) -> Result<(), Box<Error>> {
78        // Extract request info for routing
79        let req_header = session.req_header();
80        let method = req_header.method.as_str();
81        let path = req_header.uri.path();
82        let host = req_header
83            .headers
84            .get("host")
85            .and_then(|h| h.to_str().ok())
86            .unwrap_or("");
87
88        // Handle ACME HTTP-01 challenges before any other processing
89        if let Some(ref challenge_manager) = self.acme_challenges {
90            if let Some(token) = crate::acme::ChallengeManager::extract_token(path) {
91                if let Some(key_authorization) = challenge_manager.get_response(token) {
92                    debug!(
93                        token = %token,
94                        "Serving ACME HTTP-01 challenge response"
95                    );
96
97                    // Build response
98                    let mut resp = ResponseHeader::build(200, None)?;
99                    resp.insert_header("Content-Type", "text/plain")?;
100                    resp.insert_header("Content-Length", key_authorization.len().to_string())?;
101
102                    // Send response
103                    session.write_response_header(Box::new(resp), false).await?;
104                    session
105                        .write_response_body(Some(Bytes::from(key_authorization)), true)
106                        .await?;
107
108                    // Return error to signal request is complete
109                    return Err(Error::explain(
110                        ErrorType::InternalError,
111                        "ACME challenge served",
112                    ));
113                } else {
114                    // Token not found - could be a stale request or attack
115                    warn!(
116                        token = %token,
117                        "ACME challenge token not found"
118                    );
119                }
120            }
121        }
122
123        ctx.method = method.to_string();
124        ctx.path = path.to_string();
125        ctx.host = Some(host.to_string());
126
127        // Match route to determine service type
128        let route_match = {
129            let route_matcher = self.route_matcher.read();
130            let request_info = RequestInfo::new(method, path, host);
131            match route_matcher.match_request(&request_info) {
132                Some(m) => m,
133                None => return Ok(()), // No matching route, let upstream_peer handle it
134            }
135        };
136
137        ctx.trace_id = self.get_trace_id(session);
138        ctx.route_id = Some(route_match.route_id.to_string());
139        ctx.route_config = Some(route_match.config.clone());
140
141        // Parse incoming W3C trace context if present
142        if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
143            if let Ok(s) = traceparent.to_str() {
144                ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
145            }
146        }
147
148        // Start OpenTelemetry request span if tracing is enabled
149        if let Some(tracer) = crate::otel::get_tracer() {
150            ctx.otel_span = Some(tracer.start_span(method, path, ctx.trace_context.as_ref()));
151        }
152
153        // Check if this is a builtin handler route
154        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
155            trace!(
156                correlation_id = %ctx.trace_id,
157                route_id = %route_match.route_id,
158                builtin_handler = ?route_match.config.builtin_handler,
159                "Handling builtin route in early_request_filter"
160            );
161
162            // Handle the builtin route directly
163            let handled = self
164                .handle_builtin_route(session, ctx, &route_match)
165                .await?;
166
167            if handled {
168                // Return error to signal that request is complete (Pingora will not continue)
169                return Err(Error::explain(
170                    ErrorType::InternalError,
171                    "Builtin handler complete",
172                ));
173            }
174        }
175
176        Ok(())
177    }
178
179    async fn upstream_peer(
180        &self,
181        session: &mut Session,
182        ctx: &mut Self::CTX,
183    ) -> Result<Box<HttpPeer>, Box<Error>> {
184        // Track active request
185        self.reload_coordinator.inc_requests();
186
187        // Cache global config once per request (avoids repeated Arc clones)
188        if ctx.config.is_none() {
189            ctx.config = Some(self.config_manager.current());
190        }
191
192        // Cache client address for logging if not already set
193        if ctx.client_ip.is_empty() {
194            ctx.client_ip = session
195                .client_addr()
196                .map(|a| a.to_string())
197                .unwrap_or_else(|| "unknown".to_string());
198        }
199
200        let req_header = session.req_header();
201
202        // Cache request info for access logging if not already set
203        if ctx.method.is_empty() {
204            ctx.method = req_header.method.to_string();
205            ctx.path = req_header.uri.path().to_string();
206            ctx.query = req_header.uri.query().map(|q| q.to_string());
207            ctx.host = req_header
208                .headers
209                .get("host")
210                .and_then(|v| v.to_str().ok())
211                .map(|s| s.to_string());
212        }
213        ctx.user_agent = req_header
214            .headers
215            .get("user-agent")
216            .and_then(|v| v.to_str().ok())
217            .map(|s| s.to_string());
218        ctx.referer = req_header
219            .headers
220            .get("referer")
221            .and_then(|v| v.to_str().ok())
222            .map(|s| s.to_string());
223
224        trace!(
225            correlation_id = %ctx.trace_id,
226            client_ip = %ctx.client_ip,
227            "Request received, initializing context"
228        );
229
230        // Use cached route info if already set by early_request_filter
231        let route_match = if let Some(ref route_config) = ctx.route_config {
232            let route_id = ctx.route_id.as_deref().unwrap_or("");
233            crate::routing::RouteMatch {
234                route_id: sentinel_common::RouteId::new(route_id),
235                config: route_config.clone(),
236            }
237        } else {
238            // Match route using sync RwLock (scoped to ensure lock is released before async ops)
239            let (match_result, route_duration) = {
240                let route_matcher = self.route_matcher.read();
241                let host = ctx.host.as_deref().unwrap_or("");
242
243                // Build request info (zero-copy for common case)
244                let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
245
246                // Only build headers HashMap if any route needs header matching
247                if route_matcher.needs_headers() {
248                    request_info = request_info
249                        .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
250                }
251
252                // Only parse query params if any route needs query param matching
253                if route_matcher.needs_query_params() {
254                    request_info =
255                        request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
256                }
257
258                trace!(
259                    correlation_id = %ctx.trace_id,
260                    method = %request_info.method,
261                    path = %request_info.path,
262                    host = %request_info.host,
263                    "Built request info for route matching"
264                );
265
266                let route_start = std::time::Instant::now();
267                let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
268                    warn!(
269                        correlation_id = %ctx.trace_id,
270                        method = %request_info.method,
271                        path = %request_info.path,
272                        host = %request_info.host,
273                        "No matching route found for request"
274                    );
275                    Error::explain(ErrorType::InternalError, "No matching route found")
276                })?;
277                let route_duration = route_start.elapsed();
278                // Lock is dropped here when block ends
279                (route_match, route_duration)
280            };
281
282            ctx.route_id = Some(match_result.route_id.to_string());
283            ctx.route_config = Some(match_result.config.clone());
284
285            // Set trace_id if not already set by early_request_filter
286            if ctx.trace_id.is_empty() {
287                ctx.trace_id = self.get_trace_id(session);
288
289                // Parse incoming W3C trace context if present
290                if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
291                    if let Ok(s) = traceparent.to_str() {
292                        ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
293                    }
294                }
295
296                // Start OpenTelemetry request span if tracing is enabled
297                if let Some(tracer) = crate::otel::get_tracer() {
298                    ctx.otel_span =
299                        Some(tracer.start_span(&ctx.method, &ctx.path, ctx.trace_context.as_ref()));
300                }
301            }
302
303            trace!(
304                correlation_id = %ctx.trace_id,
305                route_id = %match_result.route_id,
306                route_duration_us = route_duration.as_micros(),
307                service_type = ?match_result.config.service_type,
308                "Route matched"
309            );
310            match_result
311        };
312
313        // Check if this is a builtin handler route (no upstream needed)
314        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
315            trace!(
316                correlation_id = %ctx.trace_id,
317                route_id = %route_match.route_id,
318                builtin_handler = ?route_match.config.builtin_handler,
319                "Route type is builtin, skipping upstream"
320            );
321            // Mark as builtin route for later processing in request_filter
322            ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
323            // Return error to skip upstream connection for builtin routes
324            return Err(Error::explain(
325                ErrorType::InternalError,
326                "Builtin handler handled in request_filter",
327            ));
328        }
329
330        // Check if this is a static file route
331        if route_match.config.service_type == sentinel_config::ServiceType::Static {
332            trace!(
333                correlation_id = %ctx.trace_id,
334                route_id = %route_match.route_id,
335                "Route type is static, checking for static server"
336            );
337            // Static routes don't need an upstream
338            if self
339                .static_servers
340                .get(route_match.route_id.as_str())
341                .await
342                .is_some()
343            {
344                // Mark this as a static route for later processing
345                ctx.upstream = Some(format!("_static_{}", route_match.route_id));
346                info!(
347                    correlation_id = %ctx.trace_id,
348                    route_id = %route_match.route_id,
349                    path = %ctx.path,
350                    "Serving static file"
351                );
352                // Return error to avoid upstream connection for static routes
353                return Err(Error::explain(
354                    ErrorType::InternalError,
355                    "Static file serving handled in request_filter",
356                ));
357            }
358        }
359
360        // === Model-based routing (for inference routes) ===
361        // Check if model routing is configured and select upstream based on model
362        let mut model_routing_applied = false;
363        if let Some(ref inference) = route_match.config.inference {
364            if let Some(ref model_routing) = inference.model_routing {
365                // Try to extract model from headers (fast path - no body parsing needed)
366                let model = model_routing::extract_model_from_headers(&req_header.headers);
367
368                if let Some(ref model_name) = model {
369                    // Find upstream for this model
370                    if let Some(routing_result) =
371                        model_routing::find_upstream_for_model(model_routing, model_name)
372                    {
373                        debug!(
374                            correlation_id = %ctx.trace_id,
375                            route_id = %route_match.route_id,
376                            model = %model_name,
377                            upstream = %routing_result.upstream,
378                            is_default = routing_result.is_default,
379                            provider_override = ?routing_result.provider,
380                            "Model-based routing selected upstream"
381                        );
382
383                        ctx.record_model_routing(
384                            &routing_result.upstream,
385                            Some(model_name.clone()),
386                            routing_result.provider,
387                        );
388                        model_routing_applied = true;
389
390                        // Record metrics
391                        if let Some(metrics) = get_model_routing_metrics() {
392                            metrics.record_model_routed(
393                                route_match.route_id.as_str(),
394                                model_name,
395                                &routing_result.upstream,
396                            );
397                            if routing_result.is_default {
398                                metrics.record_default_upstream(route_match.route_id.as_str());
399                            }
400                            if let Some(provider) = routing_result.provider {
401                                metrics.record_provider_override(
402                                    route_match.route_id.as_str(),
403                                    &routing_result.upstream,
404                                    provider.as_str(),
405                                );
406                            }
407                        }
408                    }
409                } else if let Some(ref default_upstream) = model_routing.default_upstream {
410                    // No model in headers, use default upstream
411                    debug!(
412                        correlation_id = %ctx.trace_id,
413                        route_id = %route_match.route_id,
414                        upstream = %default_upstream,
415                        "Model-based routing using default upstream (no model header)"
416                    );
417                    ctx.record_model_routing(default_upstream, None, None);
418                    model_routing_applied = true;
419
420                    // Record metrics for no model header case
421                    if let Some(metrics) = get_model_routing_metrics() {
422                        metrics.record_no_model_header(route_match.route_id.as_str());
423                    }
424                }
425            }
426        }
427
428        // Regular route with upstream (if model routing didn't set it)
429        if !model_routing_applied {
430            if let Some(ref upstream) = route_match.config.upstream {
431                ctx.upstream = Some(upstream.clone());
432                trace!(
433                    correlation_id = %ctx.trace_id,
434                    route_id = %route_match.route_id,
435                    upstream = %upstream,
436                    "Upstream configured for route"
437                );
438            } else {
439                error!(
440                    correlation_id = %ctx.trace_id,
441                    route_id = %route_match.route_id,
442                    "Route has no upstream configured"
443                );
444                return Err(Error::explain(
445                    ErrorType::InternalError,
446                    format!(
447                        "Route '{}' has no upstream configured",
448                        route_match.route_id
449                    ),
450                ));
451            }
452        }
453
454        // === Fallback routing evaluation (pre-request) ===
455        // Check if fallback should be triggered due to health or budget conditions
456        if let Some(ref fallback_config) = route_match.config.fallback {
457            let upstream_name = ctx.upstream.as_ref().unwrap();
458
459            // Check if primary upstream is healthy
460            let is_healthy = if let Some(pool) = self.upstream_pools.get(upstream_name).await {
461                pool.has_healthy_targets().await
462            } else {
463                false // Pool not found, treat as unhealthy
464            };
465
466            // Check if budget is exhausted (for inference routes)
467            let is_budget_exhausted = ctx.inference_budget_exhausted;
468
469            // Get model name for model mapping (inference routes)
470            let current_model = ctx.inference_model.as_deref();
471
472            // Create fallback evaluator
473            let evaluator = FallbackEvaluator::new(
474                fallback_config,
475                ctx.tried_upstreams(),
476                ctx.fallback_attempt,
477            );
478
479            // Evaluate pre-request fallback conditions
480            if let Some(decision) = evaluator.should_fallback_before_request(
481                upstream_name,
482                is_healthy,
483                is_budget_exhausted,
484                current_model,
485            ) {
486                info!(
487                    correlation_id = %ctx.trace_id,
488                    route_id = %route_match.route_id,
489                    from_upstream = %upstream_name,
490                    to_upstream = %decision.next_upstream,
491                    reason = %decision.reason,
492                    fallback_attempt = ctx.fallback_attempt + 1,
493                    "Triggering fallback routing"
494                );
495
496                // Record fallback metrics
497                if let Some(metrics) = get_fallback_metrics() {
498                    metrics.record_fallback_attempt(
499                        route_match.route_id.as_str(),
500                        upstream_name,
501                        &decision.next_upstream,
502                        &decision.reason,
503                    );
504                }
505
506                // Record fallback in context
507                ctx.record_fallback(decision.reason, &decision.next_upstream);
508
509                // Apply model mapping if present
510                if let Some((original, mapped)) = decision.model_mapping {
511                    // Record model mapping metrics
512                    if let Some(metrics) = get_fallback_metrics() {
513                        metrics.record_model_mapping(
514                            route_match.route_id.as_str(),
515                            &original,
516                            &mapped,
517                        );
518                    }
519
520                    ctx.record_model_mapping(original, mapped);
521                    trace!(
522                        correlation_id = %ctx.trace_id,
523                        original_model = ?ctx.model_mapping_applied().map(|m| &m.0),
524                        mapped_model = ?ctx.model_mapping_applied().map(|m| &m.1),
525                        "Applied model mapping for fallback"
526                    );
527                }
528            }
529        }
530
531        debug!(
532            correlation_id = %ctx.trace_id,
533            route_id = %route_match.route_id,
534            upstream = ?ctx.upstream,
535            method = %req_header.method,
536            path = %req_header.uri.path(),
537            host = ctx.host.as_deref().unwrap_or("-"),
538            client_ip = %ctx.client_ip,
539            "Processing request"
540        );
541
542        // Get upstream pool (skip for static routes)
543        if ctx
544            .upstream
545            .as_ref()
546            .is_some_and(|u| u.starts_with("_static_"))
547        {
548            // Static routes are handled in request_filter, should not reach here
549            return Err(Error::explain(
550                ErrorType::InternalError,
551                "Static route should be handled in request_filter",
552            ));
553        }
554
555        let upstream_name = ctx
556            .upstream
557            .as_ref()
558            .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
559
560        trace!(
561            correlation_id = %ctx.trace_id,
562            upstream = %upstream_name,
563            "Looking up upstream pool"
564        );
565
566        let pool = self
567            .upstream_pools
568            .get(upstream_name)
569            .await
570            .ok_or_else(|| {
571                error!(
572                    correlation_id = %ctx.trace_id,
573                    upstream = %upstream_name,
574                    "Upstream pool not found"
575                );
576                Error::explain(
577                    ErrorType::InternalError,
578                    format!("Upstream pool '{}' not found", upstream_name),
579                )
580            })?;
581
582        // Select peer from pool with retries
583        let max_retries = route_match
584            .config
585            .retry_policy
586            .as_ref()
587            .map(|r| r.max_attempts)
588            .unwrap_or(1);
589
590        trace!(
591            correlation_id = %ctx.trace_id,
592            upstream = %upstream_name,
593            max_retries = max_retries,
594            "Starting upstream peer selection"
595        );
596
597        let mut last_error = None;
598        let selection_start = std::time::Instant::now();
599
600        for attempt in 1..=max_retries {
601            ctx.upstream_attempts = attempt;
602
603            trace!(
604                correlation_id = %ctx.trace_id,
605                upstream = %upstream_name,
606                attempt = attempt,
607                max_retries = max_retries,
608                "Attempting to select upstream peer"
609            );
610
611            match pool.select_peer_with_metadata(None).await {
612                Ok((peer, metadata)) => {
613                    let selection_duration = selection_start.elapsed();
614                    // Store selected peer address for feedback reporting in logging()
615                    let peer_addr = peer.address().to_string();
616                    ctx.selected_upstream_address = Some(peer_addr.clone());
617
618                    // Copy sticky session metadata to context for response_filter
619                    if metadata.contains_key("sticky_session_new") {
620                        ctx.sticky_session_new_assignment = true;
621                        ctx.sticky_session_set_cookie =
622                            metadata.get("sticky_set_cookie_header").cloned();
623                        ctx.sticky_target_index = metadata
624                            .get("sticky_target_index")
625                            .and_then(|s| s.parse().ok());
626
627                        trace!(
628                            correlation_id = %ctx.trace_id,
629                            sticky_target_index = ?ctx.sticky_target_index,
630                            "New sticky session assignment, will set cookie"
631                        );
632                    }
633
634                    debug!(
635                        correlation_id = %ctx.trace_id,
636                        upstream = %upstream_name,
637                        peer_address = %peer_addr,
638                        attempt = attempt,
639                        selection_duration_us = selection_duration.as_micros(),
640                        sticky_session_hit = metadata.contains_key("sticky_session_hit"),
641                        sticky_session_new = ctx.sticky_session_new_assignment,
642                        "Selected upstream peer"
643                    );
644                    return Ok(Box::new(peer));
645                }
646                Err(e) => {
647                    warn!(
648                        correlation_id = %ctx.trace_id,
649                        upstream = %upstream_name,
650                        attempt = attempt,
651                        max_retries = max_retries,
652                        error = %e,
653                        "Failed to select upstream peer"
654                    );
655                    last_error = Some(e);
656
657                    if attempt < max_retries {
658                        // Exponential backoff (using pingora-timeout for efficiency)
659                        let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
660                        trace!(
661                            correlation_id = %ctx.trace_id,
662                            backoff_ms = backoff.as_millis(),
663                            "Backing off before retry"
664                        );
665                        sleep(backoff).await;
666                    }
667                }
668            }
669        }
670
671        let selection_duration = selection_start.elapsed();
672        error!(
673            correlation_id = %ctx.trace_id,
674            upstream = %upstream_name,
675            attempts = max_retries,
676            selection_duration_ms = selection_duration.as_millis(),
677            last_error = ?last_error,
678            "All upstream selection attempts failed"
679        );
680
681        // Record exhausted metric if fallback was used but all upstreams failed
682        if ctx.used_fallback() {
683            if let Some(metrics) = get_fallback_metrics() {
684                metrics.record_fallback_exhausted(ctx.route_id.as_deref().unwrap_or("unknown"));
685            }
686        }
687
688        Err(Error::explain(
689            ErrorType::InternalError,
690            format!("All upstream attempts failed: {:?}", last_error),
691        ))
692    }
693
694    async fn request_filter(
695        &self,
696        session: &mut Session,
697        ctx: &mut Self::CTX,
698    ) -> Result<bool, Box<Error>> {
699        trace!(
700            correlation_id = %ctx.trace_id,
701            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
702            "Starting request filter phase"
703        );
704
705        // Check rate limiting early (before other processing)
706        // Fast path: skip if no rate limiting is configured for this route
707        if let Some(route_id) = ctx.route_id.as_deref() {
708            if self.rate_limit_manager.has_route_limiter(route_id) {
709                let rate_result = self.rate_limit_manager.check(
710                    route_id,
711                    &ctx.client_ip,
712                    &ctx.path,
713                    Option::<&NoHeaderAccessor>::None,
714                );
715
716                // Store rate limit info for response headers (even if allowed)
717                if rate_result.limit > 0 {
718                    ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
719                        limit: rate_result.limit,
720                        remaining: rate_result.remaining,
721                        reset_at: rate_result.reset_at,
722                    });
723                }
724
725                if !rate_result.allowed {
726                    use sentinel_config::RateLimitAction;
727
728                    match rate_result.action {
729                        RateLimitAction::Reject => {
730                            warn!(
731                                correlation_id = %ctx.trace_id,
732                                route_id = route_id,
733                                client_ip = %ctx.client_ip,
734                                limiter = %rate_result.limiter,
735                                limit = rate_result.limit,
736                                remaining = rate_result.remaining,
737                                "Request rate limited"
738                            );
739                            self.metrics.record_blocked_request("rate_limited");
740
741                            // Audit log the rate limit
742                            let audit_entry = AuditLogEntry::rate_limited(
743                                &ctx.trace_id,
744                                &ctx.method,
745                                &ctx.path,
746                                &ctx.client_ip,
747                                &rate_result.limiter,
748                            )
749                            .with_route_id(route_id)
750                            .with_status_code(rate_result.status_code);
751                            self.log_manager.log_audit(&audit_entry);
752
753                            // Send rate limit response with headers
754                            let body = rate_result
755                                .message
756                                .unwrap_or_else(|| "Rate limit exceeded".to_string());
757
758                            // Build response with rate limit headers
759                            let retry_after = rate_result.reset_at.saturating_sub(
760                                std::time::SystemTime::now()
761                                    .duration_since(std::time::UNIX_EPOCH)
762                                    .unwrap_or_default()
763                                    .as_secs(),
764                            );
765                            crate::http_helpers::write_rate_limit_error(
766                                session,
767                                rate_result.status_code,
768                                &body,
769                                rate_result.limit,
770                                rate_result.remaining,
771                                rate_result.reset_at,
772                                retry_after,
773                            )
774                            .await?;
775                            return Ok(true); // Request complete, don't continue
776                        }
777                        RateLimitAction::LogOnly => {
778                            debug!(
779                                correlation_id = %ctx.trace_id,
780                                route_id = route_id,
781                                "Rate limit exceeded (log only mode)"
782                            );
783                            // Continue processing
784                        }
785                        RateLimitAction::Delay => {
786                            // Apply delay if suggested by rate limiter
787                            if let Some(delay_ms) = rate_result.suggested_delay_ms {
788                                // Cap delay at the configured maximum
789                                let actual_delay = delay_ms.min(rate_result.max_delay_ms);
790
791                                if actual_delay > 0 {
792                                    debug!(
793                                        correlation_id = %ctx.trace_id,
794                                        route_id = route_id,
795                                        suggested_delay_ms = delay_ms,
796                                        max_delay_ms = rate_result.max_delay_ms,
797                                        actual_delay_ms = actual_delay,
798                                        "Applying rate limit delay"
799                                    );
800
801                                    tokio::time::sleep(std::time::Duration::from_millis(
802                                        actual_delay,
803                                    ))
804                                    .await;
805                                }
806                            }
807                            // Continue processing after delay
808                        }
809                    }
810                }
811            }
812        }
813
814        // Inference rate limiting (token-based, for LLM/AI routes)
815        // This runs after regular rate limiting and checks service type
816        if let Some(route_id) = ctx.route_id.as_deref() {
817            if let Some(ref route_config) = ctx.route_config {
818                if route_config.service_type == sentinel_config::ServiceType::Inference
819                    && self.inference_rate_limit_manager.has_route(route_id)
820                {
821                    // For inference rate limiting, we need access to the request body
822                    // to estimate tokens. We'll use buffered body if available.
823                    let headers = &session.req_header().headers;
824
825                    // Try to get buffered body, or use empty (will estimate from headers only)
826                    let body = ctx.body_buffer.as_slice();
827
828                    // Use client IP as the rate limit key (could be enhanced to use API key header)
829                    let rate_limit_key = &ctx.client_ip;
830
831                    if let Some(check_result) = self.inference_rate_limit_manager.check(
832                        route_id,
833                        rate_limit_key,
834                        headers,
835                        body,
836                    ) {
837                        // Store inference rate limiting context for recording actual tokens later
838                        ctx.inference_rate_limit_enabled = true;
839                        ctx.inference_estimated_tokens = check_result.estimated_tokens;
840                        ctx.inference_rate_limit_key = Some(rate_limit_key.to_string());
841                        ctx.inference_model = check_result.model.clone();
842
843                        if !check_result.is_allowed() {
844                            let retry_after_ms = check_result.retry_after_ms();
845                            let retry_after_secs = retry_after_ms.div_ceil(1000);
846
847                            warn!(
848                                correlation_id = %ctx.trace_id,
849                                route_id = route_id,
850                                client_ip = %ctx.client_ip,
851                                estimated_tokens = check_result.estimated_tokens,
852                                model = ?check_result.model,
853                                retry_after_ms = retry_after_ms,
854                                "Inference rate limit exceeded (tokens)"
855                            );
856                            self.metrics
857                                .record_blocked_request("inference_rate_limited");
858
859                            // Audit log the token rate limit
860                            let audit_entry = AuditLogEntry::new(
861                                &ctx.trace_id,
862                                AuditEventType::RateLimitExceeded,
863                                &ctx.method,
864                                &ctx.path,
865                                &ctx.client_ip,
866                            )
867                            .with_route_id(route_id)
868                            .with_status_code(429)
869                            .with_reason(format!(
870                                "Token rate limit exceeded: estimated {} tokens, model={:?}",
871                                check_result.estimated_tokens, check_result.model
872                            ));
873                            self.log_manager.log_audit(&audit_entry);
874
875                            // Send 429 response with appropriate headers
876                            let body = "Token rate limit exceeded";
877                            let reset_at = std::time::SystemTime::now()
878                                .duration_since(std::time::UNIX_EPOCH)
879                                .unwrap_or_default()
880                                .as_secs()
881                                + retry_after_secs;
882
883                            // Use simplified error write for inference rate limit
884                            crate::http_helpers::write_rate_limit_error(
885                                session,
886                                429,
887                                body,
888                                0, // No request limit
889                                0, // No remaining
890                                reset_at,
891                                retry_after_secs,
892                            )
893                            .await?;
894                            return Ok(true); // Request complete, don't continue
895                        }
896
897                        trace!(
898                            correlation_id = %ctx.trace_id,
899                            route_id = route_id,
900                            estimated_tokens = check_result.estimated_tokens,
901                            model = ?check_result.model,
902                            "Inference rate limit check passed"
903                        );
904
905                        // Check budget tracking (cumulative per-period limits)
906                        if self.inference_rate_limit_manager.has_budget(route_id) {
907                            ctx.inference_budget_enabled = true;
908
909                            if let Some(budget_result) =
910                                self.inference_rate_limit_manager.check_budget(
911                                    route_id,
912                                    rate_limit_key,
913                                    check_result.estimated_tokens,
914                                )
915                            {
916                                if !budget_result.is_allowed() {
917                                    let retry_after_secs = budget_result.retry_after_secs();
918
919                                    warn!(
920                                        correlation_id = %ctx.trace_id,
921                                        route_id = route_id,
922                                        client_ip = %ctx.client_ip,
923                                        estimated_tokens = check_result.estimated_tokens,
924                                        retry_after_secs = retry_after_secs,
925                                        "Token budget exhausted"
926                                    );
927
928                                    ctx.inference_budget_exhausted = true;
929                                    self.metrics.record_blocked_request("budget_exhausted");
930
931                                    // Audit log the budget exhaustion
932                                    let audit_entry = AuditLogEntry::new(
933                                        &ctx.trace_id,
934                                        AuditEventType::RateLimitExceeded,
935                                        &ctx.method,
936                                        &ctx.path,
937                                        &ctx.client_ip,
938                                    )
939                                    .with_route_id(route_id)
940                                    .with_status_code(429)
941                                    .with_reason("Token budget exhausted".to_string());
942                                    self.log_manager.log_audit(&audit_entry);
943
944                                    // Send 429 response with budget headers
945                                    let body = "Token budget exhausted";
946                                    let reset_at = std::time::SystemTime::now()
947                                        .duration_since(std::time::UNIX_EPOCH)
948                                        .unwrap_or_default()
949                                        .as_secs()
950                                        + retry_after_secs;
951
952                                    crate::http_helpers::write_rate_limit_error(
953                                        session,
954                                        429,
955                                        body,
956                                        0,
957                                        0,
958                                        reset_at,
959                                        retry_after_secs,
960                                    )
961                                    .await?;
962                                    return Ok(true);
963                                }
964
965                                // Capture budget status for response headers
966                                let remaining = match &budget_result {
967                                    sentinel_common::budget::BudgetCheckResult::Allowed {
968                                        remaining,
969                                    } => *remaining as i64,
970                                    sentinel_common::budget::BudgetCheckResult::Soft {
971                                        remaining,
972                                        ..
973                                    } => *remaining,
974                                    _ => 0,
975                                };
976                                ctx.inference_budget_remaining = Some(remaining);
977
978                                // Get period reset time from budget status
979                                if let Some(status) = self
980                                    .inference_rate_limit_manager
981                                    .budget_status(route_id, rate_limit_key)
982                                {
983                                    ctx.inference_budget_period_reset = Some(status.period_end);
984                                }
985
986                                trace!(
987                                    correlation_id = %ctx.trace_id,
988                                    route_id = route_id,
989                                    budget_remaining = remaining,
990                                    "Token budget check passed"
991                                );
992                            }
993                        }
994
995                        // Check if cost attribution is enabled
996                        if self
997                            .inference_rate_limit_manager
998                            .has_cost_attribution(route_id)
999                        {
1000                            ctx.inference_cost_enabled = true;
1001                        }
1002                    }
1003                }
1004            }
1005        }
1006
1007        // Prompt injection guardrail (for inference routes)
1008        if let Some(ref route_config) = ctx.route_config {
1009            if let Some(ref inference) = route_config.inference {
1010                if let Some(ref guardrails) = inference.guardrails {
1011                    if let Some(ref pi_config) = guardrails.prompt_injection {
1012                        if pi_config.enabled && !ctx.body_buffer.is_empty() {
1013                            ctx.guardrails_enabled = true;
1014
1015                            // Extract content from request body
1016                            if let Some(content) = extract_inference_content(&ctx.body_buffer) {
1017                                let result = self
1018                                    .guardrail_processor
1019                                    .check_prompt_injection(
1020                                        pi_config,
1021                                        &content,
1022                                        ctx.inference_model.as_deref(),
1023                                        ctx.route_id.as_deref(),
1024                                        &ctx.trace_id,
1025                                    )
1026                                    .await;
1027
1028                                match result {
1029                                    PromptInjectionResult::Blocked {
1030                                        status,
1031                                        message,
1032                                        detections,
1033                                    } => {
1034                                        warn!(
1035                                            correlation_id = %ctx.trace_id,
1036                                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1037                                            detection_count = detections.len(),
1038                                            "Prompt injection detected, blocking request"
1039                                        );
1040
1041                                        self.metrics.record_blocked_request("prompt_injection");
1042
1043                                        // Store detection categories for logging
1044                                        ctx.guardrail_detection_categories =
1045                                            detections.iter().map(|d| d.category.clone()).collect();
1046
1047                                        // Audit log the block
1048                                        let audit_entry = AuditLogEntry::new(
1049                                            &ctx.trace_id,
1050                                            AuditEventType::Blocked,
1051                                            &ctx.method,
1052                                            &ctx.path,
1053                                            &ctx.client_ip,
1054                                        )
1055                                        .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
1056                                        .with_status_code(status)
1057                                        .with_reason("Prompt injection detected".to_string());
1058                                        self.log_manager.log_audit(&audit_entry);
1059
1060                                        // Send error response
1061                                        crate::http_helpers::write_json_error(
1062                                            session,
1063                                            status,
1064                                            "prompt_injection_blocked",
1065                                            Some(&message),
1066                                        )
1067                                        .await?;
1068                                        return Ok(true);
1069                                    }
1070                                    PromptInjectionResult::Detected { detections } => {
1071                                        // Log but allow
1072                                        warn!(
1073                                            correlation_id = %ctx.trace_id,
1074                                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1075                                            detection_count = detections.len(),
1076                                            "Prompt injection detected (logged only)"
1077                                        );
1078                                        ctx.guardrail_detection_categories =
1079                                            detections.iter().map(|d| d.category.clone()).collect();
1080                                    }
1081                                    PromptInjectionResult::Warning { detections } => {
1082                                        // Set flag for response header
1083                                        ctx.guardrail_warning = true;
1084                                        ctx.guardrail_detection_categories =
1085                                            detections.iter().map(|d| d.category.clone()).collect();
1086                                        debug!(
1087                                            correlation_id = %ctx.trace_id,
1088                                            "Prompt injection warning set"
1089                                        );
1090                                    }
1091                                    PromptInjectionResult::Clean => {
1092                                        trace!(
1093                                            correlation_id = %ctx.trace_id,
1094                                            "No prompt injection detected"
1095                                        );
1096                                    }
1097                                    PromptInjectionResult::Error { message } => {
1098                                        // Already logged in processor, just trace here
1099                                        trace!(
1100                                            correlation_id = %ctx.trace_id,
1101                                            error = %message,
1102                                            "Prompt injection check error (failure mode applied)"
1103                                        );
1104                                    }
1105                                }
1106                            }
1107                        }
1108                    }
1109                }
1110            }
1111        }
1112
1113        // Geo filtering
1114        if let Some(route_id) = ctx.route_id.as_deref() {
1115            if let Some(ref route_config) = ctx.route_config {
1116                for filter_id in &route_config.filters {
1117                    if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
1118                        // Store country code for response header
1119                        ctx.geo_country_code = result.country_code.clone();
1120                        ctx.geo_lookup_performed = true;
1121
1122                        if !result.allowed {
1123                            warn!(
1124                                correlation_id = %ctx.trace_id,
1125                                route_id = route_id,
1126                                client_ip = %ctx.client_ip,
1127                                country = ?result.country_code,
1128                                filter_id = %filter_id,
1129                                "Request blocked by geo filter"
1130                            );
1131                            self.metrics.record_blocked_request("geo_blocked");
1132
1133                            // Audit log the geo block
1134                            let audit_entry = AuditLogEntry::new(
1135                                &ctx.trace_id,
1136                                AuditEventType::Blocked,
1137                                &ctx.method,
1138                                &ctx.path,
1139                                &ctx.client_ip,
1140                            )
1141                            .with_route_id(route_id)
1142                            .with_status_code(result.status_code)
1143                            .with_reason(format!(
1144                                "Geo blocked: country={}, filter={}",
1145                                result.country_code.as_deref().unwrap_or("unknown"),
1146                                filter_id
1147                            ));
1148                            self.log_manager.log_audit(&audit_entry);
1149
1150                            // Send geo block response
1151                            let body = result
1152                                .block_message
1153                                .unwrap_or_else(|| "Access denied".to_string());
1154
1155                            crate::http_helpers::write_error(
1156                                session,
1157                                result.status_code,
1158                                &body,
1159                                "text/plain",
1160                            )
1161                            .await?;
1162                            return Ok(true); // Request complete, don't continue
1163                        }
1164
1165                        // Only check first geo filter that matches
1166                        break;
1167                    }
1168                }
1169            }
1170        }
1171
1172        // Check for WebSocket upgrade requests
1173        let is_websocket_upgrade = session
1174            .req_header()
1175            .headers
1176            .get(http::header::UPGRADE)
1177            .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
1178            .unwrap_or(false);
1179
1180        if is_websocket_upgrade {
1181            ctx.is_websocket_upgrade = true;
1182
1183            // Check if route allows WebSocket upgrades
1184            if let Some(ref route_config) = ctx.route_config {
1185                if !route_config.websocket {
1186                    warn!(
1187                        correlation_id = %ctx.trace_id,
1188                        route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1189                        client_ip = %ctx.client_ip,
1190                        "WebSocket upgrade rejected: not enabled for route"
1191                    );
1192
1193                    self.metrics.record_blocked_request("websocket_not_enabled");
1194
1195                    // Audit log the rejection
1196                    let audit_entry = AuditLogEntry::new(
1197                        &ctx.trace_id,
1198                        AuditEventType::Blocked,
1199                        &ctx.method,
1200                        &ctx.path,
1201                        &ctx.client_ip,
1202                    )
1203                    .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
1204                    .with_action("websocket_rejected")
1205                    .with_reason("WebSocket not enabled for route");
1206                    self.log_manager.log_audit(&audit_entry);
1207
1208                    // Send 403 Forbidden response
1209                    crate::http_helpers::write_error(
1210                        session,
1211                        403,
1212                        "WebSocket not enabled for this route",
1213                        "text/plain",
1214                    )
1215                    .await?;
1216                    return Ok(true); // Request complete, don't continue
1217                }
1218
1219                debug!(
1220                    correlation_id = %ctx.trace_id,
1221                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1222                    "WebSocket upgrade request allowed"
1223                );
1224
1225                // Check for WebSocket frame inspection
1226                if route_config.websocket_inspection {
1227                    // Check for compression negotiation - skip inspection if permessage-deflate
1228                    let has_compression = session
1229                        .req_header()
1230                        .headers
1231                        .get("Sec-WebSocket-Extensions")
1232                        .and_then(|v| v.to_str().ok())
1233                        .map(|s| s.contains("permessage-deflate"))
1234                        .unwrap_or(false);
1235
1236                    if has_compression {
1237                        debug!(
1238                            correlation_id = %ctx.trace_id,
1239                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1240                            "WebSocket inspection skipped: permessage-deflate negotiated"
1241                        );
1242                        ctx.websocket_skip_inspection = true;
1243                    } else {
1244                        ctx.websocket_inspection_enabled = true;
1245
1246                        // Get agents that handle WebSocketFrame events
1247                        ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
1248                            sentinel_agent_protocol::EventType::WebSocketFrame,
1249                        );
1250
1251                        debug!(
1252                            correlation_id = %ctx.trace_id,
1253                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1254                            agent_count = ctx.websocket_inspection_agents.len(),
1255                            "WebSocket frame inspection enabled"
1256                        );
1257                    }
1258                }
1259            }
1260        }
1261
1262        // Use cached route config from upstream_peer (avoids duplicate route matching)
1263        // Handle static file and builtin routes
1264        if let Some(route_config) = ctx.route_config.clone() {
1265            if route_config.service_type == sentinel_config::ServiceType::Static {
1266                trace!(
1267                    correlation_id = %ctx.trace_id,
1268                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1269                    "Handling static file route"
1270                );
1271                // Create a minimal RouteMatch for the handler
1272                let route_match = crate::routing::RouteMatch {
1273                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1274                    config: route_config.clone(),
1275                };
1276                return self.handle_static_route(session, ctx, &route_match).await;
1277            } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
1278                trace!(
1279                    correlation_id = %ctx.trace_id,
1280                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1281                    builtin_handler = ?route_config.builtin_handler,
1282                    "Handling builtin route"
1283                );
1284                // Create a minimal RouteMatch for the handler
1285                let route_match = crate::routing::RouteMatch {
1286                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1287                    config: route_config.clone(),
1288                };
1289                return self.handle_builtin_route(session, ctx, &route_match).await;
1290            }
1291        }
1292
1293        // API validation for API routes
1294        if let Some(route_id) = ctx.route_id.clone() {
1295            if let Some(validator) = self.validators.get(&route_id).await {
1296                trace!(
1297                    correlation_id = %ctx.trace_id,
1298                    route_id = %route_id,
1299                    "Running API schema validation"
1300                );
1301                if let Some(result) = self
1302                    .validate_api_request(session, ctx, &route_id, &validator)
1303                    .await?
1304                {
1305                    debug!(
1306                        correlation_id = %ctx.trace_id,
1307                        route_id = %route_id,
1308                        validation_passed = result,
1309                        "API validation complete"
1310                    );
1311                    return Ok(result);
1312                }
1313            }
1314        }
1315
1316        // Get client address before mutable borrow
1317        let client_addr = session
1318            .client_addr()
1319            .map(|a| format!("{}", a))
1320            .unwrap_or_else(|| "unknown".to_string());
1321        let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
1322
1323        let req_header = session.req_header_mut();
1324
1325        // Add correlation ID header
1326        req_header
1327            .insert_header("X-Correlation-Id", &ctx.trace_id)
1328            .ok();
1329        req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
1330
1331        // Use cached config (set in upstream_peer, or fetch now if needed)
1332        let config = ctx
1333            .config
1334            .get_or_insert_with(|| self.config_manager.current());
1335
1336        // Enforce header limits (fast path: skip if limits are very high)
1337        const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; // 1MB = effectively unlimited
1338
1339        // Header count check - O(1)
1340        let header_count = req_header.headers.len();
1341        if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
1342            && header_count > config.limits.max_header_count
1343        {
1344            warn!(
1345                correlation_id = %ctx.trace_id,
1346                header_count = header_count,
1347                limit = config.limits.max_header_count,
1348                "Request blocked: exceeds header count limit"
1349            );
1350
1351            self.metrics.record_blocked_request("header_count_exceeded");
1352            return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
1353        }
1354
1355        // Header size check - O(n), skip if limit is very high
1356        if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
1357            let total_header_size: usize = req_header
1358                .headers
1359                .iter()
1360                .map(|(k, v)| k.as_str().len() + v.len())
1361                .sum();
1362
1363            if total_header_size > config.limits.max_header_size_bytes {
1364                warn!(
1365                    correlation_id = %ctx.trace_id,
1366                    header_size = total_header_size,
1367                    limit = config.limits.max_header_size_bytes,
1368                    "Request blocked: exceeds header size limit"
1369                );
1370
1371                self.metrics.record_blocked_request("header_size_exceeded");
1372                return Err(Error::explain(
1373                    ErrorType::InternalError,
1374                    "Headers too large",
1375                ));
1376            }
1377        }
1378
1379        // Process through external agents
1380        trace!(
1381            correlation_id = %ctx.trace_id,
1382            "Processing request through agents"
1383        );
1384        if let Err(e) = self
1385            .process_agents(session, ctx, &client_addr, client_port)
1386            .await
1387        {
1388            // Check if this is an HTTPStatus error (e.g., agent block or fail-closed)
1389            // In that case, we need to send a proper HTTP response instead of just closing the connection
1390            if let ErrorType::HTTPStatus(status) = e.etype() {
1391                // Extract the message from the error (the context part after "HTTPStatus context:")
1392                let error_msg = e.to_string();
1393                let body = error_msg
1394                    .split("context:")
1395                    .nth(1)
1396                    .map(|s| s.trim())
1397                    .unwrap_or("Request blocked");
1398                debug!(
1399                    correlation_id = %ctx.trace_id,
1400                    status = status,
1401                    body = %body,
1402                    "Sending HTTP error response for agent block"
1403                );
1404                crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
1405                return Ok(true); // Request complete, don't continue to upstream
1406            }
1407            // For other errors, propagate them
1408            return Err(e);
1409        }
1410
1411        trace!(
1412            correlation_id = %ctx.trace_id,
1413            "Request filter phase complete, forwarding to upstream"
1414        );
1415
1416        Ok(false) // Continue processing
1417    }
1418
1419    /// Process incoming request body chunks.
1420    /// Used for body size enforcement and WAF/agent inspection.
1421    ///
1422    /// Supports two modes:
1423    /// - **Buffer mode** (default): Buffer chunks until end of stream or limit, then send to agents
1424    /// - **Stream mode**: Send each chunk immediately to agents as it arrives
1425    async fn request_body_filter(
1426        &self,
1427        _session: &mut Session,
1428        body: &mut Option<Bytes>,
1429        end_of_stream: bool,
1430        ctx: &mut Self::CTX,
1431    ) -> Result<(), Box<Error>> {
1432        use sentinel_config::BodyStreamingMode;
1433
1434        // Handle WebSocket frame inspection (client -> server)
1435        if ctx.is_websocket_upgrade {
1436            if let Some(ref handler) = ctx.websocket_handler {
1437                let result = handler.process_client_data(body.take()).await;
1438                match result {
1439                    crate::websocket::ProcessResult::Forward(data) => {
1440                        *body = data;
1441                    }
1442                    crate::websocket::ProcessResult::Close(reason) => {
1443                        warn!(
1444                            correlation_id = %ctx.trace_id,
1445                            code = reason.code,
1446                            reason = %reason.reason,
1447                            "WebSocket connection closed by agent (client->server)"
1448                        );
1449                        // Return an error to close the connection
1450                        return Err(Error::explain(
1451                            ErrorType::InternalError,
1452                            format!("WebSocket closed: {} {}", reason.code, reason.reason),
1453                        ));
1454                    }
1455                }
1456            }
1457            // Skip normal body processing for WebSocket
1458            return Ok(());
1459        }
1460
1461        // Track request body size
1462        let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
1463        if chunk_len > 0 {
1464            ctx.request_body_bytes += chunk_len as u64;
1465
1466            trace!(
1467                correlation_id = %ctx.trace_id,
1468                chunk_size = chunk_len,
1469                total_body_bytes = ctx.request_body_bytes,
1470                end_of_stream = end_of_stream,
1471                streaming_mode = ?ctx.request_body_streaming_mode,
1472                "Processing request body chunk"
1473            );
1474
1475            // Check body size limit (use cached config)
1476            let config = ctx
1477                .config
1478                .get_or_insert_with(|| self.config_manager.current());
1479            if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
1480                warn!(
1481                    correlation_id = %ctx.trace_id,
1482                    body_bytes = ctx.request_body_bytes,
1483                    limit = config.limits.max_body_size_bytes,
1484                    "Request body size limit exceeded"
1485                );
1486                self.metrics.record_blocked_request("body_size_exceeded");
1487                return Err(Error::explain(
1488                    ErrorType::InternalError,
1489                    "Request body too large",
1490                ));
1491            }
1492        }
1493
1494        // Body inspection for agents (WAF, etc.)
1495        if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
1496            let config = ctx
1497                .config
1498                .get_or_insert_with(|| self.config_manager.current());
1499            let max_inspection_bytes = config
1500                .waf
1501                .as_ref()
1502                .map(|w| w.body_inspection.max_inspection_bytes as u64)
1503                .unwrap_or(1024 * 1024);
1504
1505            match ctx.request_body_streaming_mode {
1506                BodyStreamingMode::Stream => {
1507                    // Stream mode: send each chunk immediately
1508                    if body.is_some() {
1509                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1510                            .await?;
1511                    } else if end_of_stream && ctx.agent_needs_more {
1512                        // Send final empty chunk to signal end
1513                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1514                            .await?;
1515                    }
1516                }
1517                BodyStreamingMode::Hybrid { buffer_threshold } => {
1518                    // Hybrid mode: buffer up to threshold, then stream
1519                    if ctx.body_bytes_inspected < buffer_threshold as u64 {
1520                        // Still in buffering phase
1521                        if let Some(ref chunk) = body {
1522                            let bytes_to_buffer = std::cmp::min(
1523                                chunk.len(),
1524                                (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
1525                            );
1526                            ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
1527                            ctx.body_bytes_inspected += bytes_to_buffer as u64;
1528
1529                            // If we've reached threshold or end of stream, switch to streaming
1530                            if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
1531                            {
1532                                // Send buffered content first
1533                                self.send_buffered_body_to_agents(
1534                                    end_of_stream && chunk.len() == bytes_to_buffer,
1535                                    ctx,
1536                                )
1537                                .await?;
1538                                ctx.body_buffer.clear();
1539
1540                                // If there's remaining data in this chunk, stream it
1541                                if bytes_to_buffer < chunk.len() {
1542                                    let remaining = chunk.slice(bytes_to_buffer..);
1543                                    let mut remaining_body = Some(remaining);
1544                                    self.process_body_chunk_streaming(
1545                                        &mut remaining_body,
1546                                        end_of_stream,
1547                                        ctx,
1548                                    )
1549                                    .await?;
1550                                }
1551                            }
1552                        }
1553                    } else {
1554                        // Past threshold, stream directly
1555                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1556                            .await?;
1557                    }
1558                }
1559                BodyStreamingMode::Buffer => {
1560                    // Buffer mode: collect chunks until ready to send
1561                    if let Some(ref chunk) = body {
1562                        if ctx.body_bytes_inspected < max_inspection_bytes {
1563                            let bytes_to_inspect = std::cmp::min(
1564                                chunk.len() as u64,
1565                                max_inspection_bytes - ctx.body_bytes_inspected,
1566                            ) as usize;
1567
1568                            ctx.body_buffer
1569                                .extend_from_slice(&chunk[..bytes_to_inspect]);
1570                            ctx.body_bytes_inspected += bytes_to_inspect as u64;
1571
1572                            trace!(
1573                                correlation_id = %ctx.trace_id,
1574                                bytes_inspected = ctx.body_bytes_inspected,
1575                                max_inspection_bytes = max_inspection_bytes,
1576                                buffer_size = ctx.body_buffer.len(),
1577                                "Buffering body for agent inspection"
1578                            );
1579                        }
1580                    }
1581
1582                    // Send when complete or limit reached
1583                    let should_send =
1584                        end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1585                    if should_send && !ctx.body_buffer.is_empty() {
1586                        self.send_buffered_body_to_agents(end_of_stream, ctx)
1587                            .await?;
1588                        ctx.body_buffer.clear();
1589                    }
1590                }
1591            }
1592        }
1593
1594        if end_of_stream {
1595            trace!(
1596                correlation_id = %ctx.trace_id,
1597                total_body_bytes = ctx.request_body_bytes,
1598                bytes_inspected = ctx.body_bytes_inspected,
1599                "Request body complete"
1600            );
1601        }
1602
1603        Ok(())
1604    }
1605
1606    async fn response_filter(
1607        &self,
1608        _session: &mut Session,
1609        upstream_response: &mut ResponseHeader,
1610        ctx: &mut Self::CTX,
1611    ) -> Result<(), Box<Error>> {
1612        let status = upstream_response.status.as_u16();
1613        let duration = ctx.elapsed();
1614
1615        trace!(
1616            correlation_id = %ctx.trace_id,
1617            status = status,
1618            "Starting response filter phase"
1619        );
1620
1621        // Handle WebSocket 101 Switching Protocols
1622        if status == 101 && ctx.is_websocket_upgrade {
1623            if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1624                // Create WebSocket inspector and handler with metrics
1625                let inspector = crate::websocket::WebSocketInspector::with_metrics(
1626                    self.agent_manager.clone(),
1627                    ctx.route_id
1628                        .clone()
1629                        .unwrap_or_else(|| "unknown".to_string()),
1630                    ctx.trace_id.clone(),
1631                    ctx.client_ip.clone(),
1632                    100, // 100ms timeout per frame inspection
1633                    Some(self.metrics.clone()),
1634                );
1635
1636                let handler = crate::websocket::WebSocketHandler::new(
1637                    std::sync::Arc::new(inspector),
1638                    1024 * 1024, // 1MB max frame size
1639                );
1640
1641                ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1642
1643                info!(
1644                    correlation_id = %ctx.trace_id,
1645                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1646                    agent_count = ctx.websocket_inspection_agents.len(),
1647                    "WebSocket upgrade successful, frame inspection enabled"
1648                );
1649            } else if ctx.websocket_skip_inspection {
1650                debug!(
1651                    correlation_id = %ctx.trace_id,
1652                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1653                    "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1654                );
1655            } else {
1656                debug!(
1657                    correlation_id = %ctx.trace_id,
1658                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1659                    "WebSocket upgrade successful"
1660                );
1661            }
1662        }
1663
1664        // Apply security headers
1665        trace!(
1666            correlation_id = %ctx.trace_id,
1667            "Applying security headers"
1668        );
1669        self.apply_security_headers(upstream_response).ok();
1670
1671        // Add correlation ID to response
1672        upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1673
1674        // Add rate limit headers if rate limiting was applied
1675        if let Some(ref rate_info) = ctx.rate_limit_info {
1676            upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1677            upstream_response
1678                .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1679            upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1680        }
1681
1682        // Add token budget headers if budget tracking was enabled
1683        if ctx.inference_budget_enabled {
1684            if let Some(remaining) = ctx.inference_budget_remaining {
1685                upstream_response.insert_header("X-Budget-Remaining", remaining.to_string())?;
1686            }
1687            if let Some(period_reset) = ctx.inference_budget_period_reset {
1688                // Format as ISO 8601 timestamp
1689                let reset_datetime = chrono::DateTime::from_timestamp(period_reset as i64, 0)
1690                    .map(|dt| dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1691                    .unwrap_or_else(|| period_reset.to_string());
1692                upstream_response.insert_header("X-Budget-Period-Reset", reset_datetime)?;
1693            }
1694        }
1695
1696        // Add GeoIP country header if geo lookup was performed
1697        if let Some(ref country_code) = ctx.geo_country_code {
1698            upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1699        }
1700
1701        // Add sticky session cookie if a new assignment was made
1702        if ctx.sticky_session_new_assignment {
1703            if let Some(ref set_cookie_header) = ctx.sticky_session_set_cookie {
1704                upstream_response.insert_header("Set-Cookie", set_cookie_header)?;
1705                trace!(
1706                    correlation_id = %ctx.trace_id,
1707                    sticky_target_index = ?ctx.sticky_target_index,
1708                    "Added sticky session Set-Cookie header"
1709                );
1710            }
1711        }
1712
1713        // Add guardrail warning header if prompt injection was detected (warn mode)
1714        if ctx.guardrail_warning {
1715            upstream_response.insert_header("X-Guardrail-Warning", "prompt-injection-detected")?;
1716        }
1717
1718        // Add fallback routing headers if fallback was used
1719        if ctx.used_fallback() {
1720            upstream_response.insert_header("X-Fallback-Used", "true")?;
1721
1722            if let Some(ref upstream) = ctx.upstream {
1723                upstream_response.insert_header("X-Fallback-Upstream", upstream)?;
1724            }
1725
1726            if let Some(ref reason) = ctx.fallback_reason {
1727                upstream_response.insert_header("X-Fallback-Reason", reason.to_string())?;
1728            }
1729
1730            if let Some(ref original) = ctx.original_upstream {
1731                upstream_response.insert_header("X-Original-Upstream", original)?;
1732            }
1733
1734            if let Some(ref mapping) = ctx.model_mapping_applied {
1735                upstream_response
1736                    .insert_header("X-Model-Mapping", format!("{} -> {}", mapping.0, mapping.1))?;
1737            }
1738
1739            trace!(
1740                correlation_id = %ctx.trace_id,
1741                fallback_attempt = ctx.fallback_attempt,
1742                fallback_upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1743                original_upstream = ctx.original_upstream.as_deref().unwrap_or("unknown"),
1744                "Added fallback response headers"
1745            );
1746
1747            // Record fallback success metrics for successful responses (2xx/3xx)
1748            if status < 400 {
1749                if let Some(metrics) = get_fallback_metrics() {
1750                    metrics.record_fallback_success(
1751                        ctx.route_id.as_deref().unwrap_or("unknown"),
1752                        ctx.upstream.as_deref().unwrap_or("unknown"),
1753                    );
1754                }
1755            }
1756        }
1757
1758        // Initialize streaming token counter for SSE responses on inference routes
1759        if ctx.inference_rate_limit_enabled {
1760            // Check if this is an SSE response
1761            let content_type = upstream_response
1762                .headers
1763                .get("content-type")
1764                .and_then(|ct| ct.to_str().ok());
1765
1766            if is_sse_response(content_type) {
1767                // Get provider from route config
1768                let provider = ctx
1769                    .route_config
1770                    .as_ref()
1771                    .and_then(|r| r.inference.as_ref())
1772                    .map(|i| i.provider)
1773                    .unwrap_or_default();
1774
1775                ctx.inference_streaming_response = true;
1776                ctx.inference_streaming_counter = Some(StreamingTokenCounter::new(
1777                    provider,
1778                    ctx.inference_model.clone(),
1779                ));
1780
1781                trace!(
1782                    correlation_id = %ctx.trace_id,
1783                    content_type = ?content_type,
1784                    model = ?ctx.inference_model,
1785                    "Initialized streaming token counter for SSE response"
1786                );
1787            }
1788        }
1789
1790        // Generate custom error pages for error responses
1791        if status >= 400 {
1792            trace!(
1793                correlation_id = %ctx.trace_id,
1794                status = status,
1795                "Handling error response"
1796            );
1797            self.handle_error_response(upstream_response, ctx).await?;
1798        }
1799
1800        // Record metrics
1801        self.metrics.record_request(
1802            ctx.route_id.as_deref().unwrap_or("unknown"),
1803            &ctx.method,
1804            status,
1805            duration,
1806        );
1807
1808        // Record OpenTelemetry span status
1809        if let Some(ref mut span) = ctx.otel_span {
1810            span.set_status(status);
1811            if let Some(ref upstream) = ctx.upstream {
1812                span.set_upstream(upstream, "");
1813            }
1814            if status >= 500 {
1815                span.record_error(&format!("HTTP {}", status));
1816            }
1817        }
1818
1819        // Record passive health check
1820        if let Some(ref upstream) = ctx.upstream {
1821            let success = status < 500;
1822
1823            trace!(
1824                correlation_id = %ctx.trace_id,
1825                upstream = %upstream,
1826                success = success,
1827                status = status,
1828                "Recording passive health check result"
1829            );
1830
1831            let error_msg = if !success {
1832                Some(format!("HTTP {}", status))
1833            } else {
1834                None
1835            };
1836            self.passive_health
1837                .record_outcome(upstream, success, error_msg.as_deref())
1838                .await;
1839
1840            // Report to upstream pool
1841            if let Some(pool) = self.upstream_pools.get(upstream).await {
1842                pool.report_result(upstream, success).await;
1843            }
1844
1845            if !success {
1846                warn!(
1847                    correlation_id = %ctx.trace_id,
1848                    upstream = %upstream,
1849                    status = status,
1850                    "Upstream returned error status"
1851                );
1852            }
1853        }
1854
1855        // Final request completion log
1856        if status >= 500 {
1857            error!(
1858                correlation_id = %ctx.trace_id,
1859                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1860                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1861                method = %ctx.method,
1862                path = %ctx.path,
1863                status = status,
1864                duration_ms = duration.as_millis(),
1865                attempts = ctx.upstream_attempts,
1866                "Request completed with server error"
1867            );
1868        } else if status >= 400 {
1869            warn!(
1870                correlation_id = %ctx.trace_id,
1871                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1872                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1873                method = %ctx.method,
1874                path = %ctx.path,
1875                status = status,
1876                duration_ms = duration.as_millis(),
1877                "Request completed with client error"
1878            );
1879        } else {
1880            debug!(
1881                correlation_id = %ctx.trace_id,
1882                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1883                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1884                method = %ctx.method,
1885                path = %ctx.path,
1886                status = status,
1887                duration_ms = duration.as_millis(),
1888                attempts = ctx.upstream_attempts,
1889                "Request completed"
1890            );
1891        }
1892
1893        Ok(())
1894    }
1895
1896    /// Modify the request before sending to upstream.
1897    /// Used for header modifications, adding authentication, etc.
1898    async fn upstream_request_filter(
1899        &self,
1900        _session: &mut Session,
1901        upstream_request: &mut pingora::http::RequestHeader,
1902        ctx: &mut Self::CTX,
1903    ) -> Result<()>
1904    where
1905        Self::CTX: Send + Sync,
1906    {
1907        trace!(
1908            correlation_id = %ctx.trace_id,
1909            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1910            "Applying upstream request modifications"
1911        );
1912
1913        // Add trace ID header for upstream correlation
1914        upstream_request
1915            .insert_header("X-Trace-Id", &ctx.trace_id)
1916            .ok();
1917
1918        // Add W3C traceparent header for distributed tracing
1919        if let Some(ref span) = ctx.otel_span {
1920            let sampled = ctx
1921                .trace_context
1922                .as_ref()
1923                .map(|c| c.sampled)
1924                .unwrap_or(true);
1925            let traceparent =
1926                crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled);
1927            upstream_request
1928                .insert_header(crate::otel::TRACEPARENT_HEADER, &traceparent)
1929                .ok();
1930        }
1931
1932        // Add request metadata headers
1933        upstream_request
1934            .insert_header("X-Forwarded-By", "Sentinel")
1935            .ok();
1936
1937        // Apply route-specific request header modifications
1938        // Note: Pingora's IntoCaseHeaderName requires owned String for header names,
1939        // so we clone names but pass values by reference to avoid cloning both.
1940        if let Some(ref route_config) = ctx.route_config {
1941            let mods = &route_config.policies.request_headers;
1942
1943            // Set headers (overwrite existing)
1944            for (name, value) in &mods.set {
1945                upstream_request
1946                    .insert_header(name.clone(), value.as_str())
1947                    .ok();
1948            }
1949
1950            // Add headers (append)
1951            for (name, value) in &mods.add {
1952                upstream_request
1953                    .append_header(name.clone(), value.as_str())
1954                    .ok();
1955            }
1956
1957            // Remove headers
1958            for name in &mods.remove {
1959                upstream_request.remove_header(name);
1960            }
1961
1962            trace!(
1963                correlation_id = %ctx.trace_id,
1964                "Applied request header modifications"
1965            );
1966        }
1967
1968        // Remove sensitive headers that shouldn't go to upstream
1969        upstream_request.remove_header("X-Internal-Token");
1970        upstream_request.remove_header("Authorization-Internal");
1971
1972        // === Traffic Mirroring / Shadowing ===
1973        // Check if this route has shadow configuration
1974        if let Some(ref route_config) = ctx.route_config {
1975            if let Some(ref shadow_config) = route_config.shadow {
1976                // Get snapshot of upstream pools for shadow manager
1977                let pools_snapshot = self.upstream_pools.snapshot().await;
1978                let upstream_pools = std::sync::Arc::new(pools_snapshot);
1979
1980                // Get route ID for metrics labeling
1981                let route_id = ctx
1982                    .route_id
1983                    .clone()
1984                    .unwrap_or_else(|| "unknown".to_string());
1985
1986                // Create shadow manager
1987                let shadow_manager = crate::shadow::ShadowManager::new(
1988                    upstream_pools,
1989                    shadow_config.clone(),
1990                    Some(std::sync::Arc::clone(&self.metrics)),
1991                    route_id,
1992                );
1993
1994                // Check if we should shadow this request (sampling + header check)
1995                if shadow_manager.should_shadow(upstream_request) {
1996                    trace!(
1997                        correlation_id = %ctx.trace_id,
1998                        shadow_upstream = %shadow_config.upstream,
1999                        percentage = shadow_config.percentage,
2000                        "Shadowing request"
2001                    );
2002
2003                    // Clone headers for shadow request
2004                    let shadow_headers = upstream_request.clone();
2005
2006                    // Create request context for shadow (simplified from proxy context)
2007                    let shadow_ctx = crate::upstream::RequestContext {
2008                        client_ip: ctx.client_ip.parse().ok(),
2009                        headers: std::collections::HashMap::new(), // Empty for now
2010                        path: ctx.path.clone(),
2011                        method: ctx.method.clone(),
2012                    };
2013
2014                    // Determine if we should buffer the body
2015                    let buffer_body = shadow_config.buffer_body
2016                        && crate::shadow::should_buffer_method(&ctx.method);
2017
2018                    if buffer_body {
2019                        // Body buffering requested - defer shadow request until body is available
2020                        // Store shadow info in context; will be fired in logging phase
2021                        // or when request body filter completes
2022                        trace!(
2023                            correlation_id = %ctx.trace_id,
2024                            "Deferring shadow request until body is buffered"
2025                        );
2026                        ctx.shadow_pending = Some(crate::proxy::context::ShadowPendingRequest {
2027                            headers: shadow_headers,
2028                            manager: std::sync::Arc::new(shadow_manager),
2029                            request_ctx: shadow_ctx,
2030                            include_body: true,
2031                        });
2032                        // Enable body inspection to capture the body for shadow
2033                        // (only if not already enabled for other reasons)
2034                        if !ctx.body_inspection_enabled {
2035                            ctx.body_inspection_enabled = true;
2036                            // Set a reasonable buffer limit from shadow config
2037                            // (body_buffer will accumulate chunks)
2038                        }
2039                    } else {
2040                        // No body buffering needed - fire shadow request immediately
2041                        shadow_manager.shadow_request(shadow_headers, None, shadow_ctx);
2042                        ctx.shadow_sent = true;
2043                    }
2044                }
2045            }
2046        }
2047
2048        Ok(())
2049    }
2050
2051    /// Process response body chunks from upstream.
2052    /// Used for response size tracking and WAF inspection.
2053    ///
2054    /// Note: Response body inspection is currently buffered only (streaming mode not supported
2055    /// for responses due to Pingora's synchronous filter design).
2056    fn response_body_filter(
2057        &self,
2058        _session: &mut Session,
2059        body: &mut Option<Bytes>,
2060        end_of_stream: bool,
2061        ctx: &mut Self::CTX,
2062    ) -> Result<Option<Duration>, Box<Error>> {
2063        // Handle WebSocket frame inspection (server -> client)
2064        // Note: This filter is synchronous, so we use block_in_place for async agent calls
2065        if ctx.is_websocket_upgrade {
2066            if let Some(ref handler) = ctx.websocket_handler {
2067                let handler = handler.clone();
2068                let data = body.take();
2069
2070                // Use block_in_place to run async handler from sync context
2071                // This is safe because Pingora uses a multi-threaded tokio runtime
2072                let result = tokio::task::block_in_place(|| {
2073                    tokio::runtime::Handle::current()
2074                        .block_on(async { handler.process_server_data(data).await })
2075                });
2076
2077                match result {
2078                    crate::websocket::ProcessResult::Forward(data) => {
2079                        *body = data;
2080                    }
2081                    crate::websocket::ProcessResult::Close(reason) => {
2082                        warn!(
2083                            correlation_id = %ctx.trace_id,
2084                            code = reason.code,
2085                            reason = %reason.reason,
2086                            "WebSocket connection closed by agent (server->client)"
2087                        );
2088                        // For sync filter, we can't return an error that closes the connection
2089                        // Instead, inject a close frame
2090                        let close_frame =
2091                            crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
2092                        let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
2093                        if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
2094                            *body = Some(Bytes::from(encoded));
2095                        }
2096                    }
2097                }
2098            }
2099            // Skip normal body processing for WebSocket
2100            return Ok(None);
2101        }
2102
2103        // Track response body size
2104        if let Some(ref chunk) = body {
2105            ctx.response_bytes += chunk.len() as u64;
2106
2107            trace!(
2108                correlation_id = %ctx.trace_id,
2109                chunk_size = chunk.len(),
2110                total_response_bytes = ctx.response_bytes,
2111                end_of_stream = end_of_stream,
2112                "Processing response body chunk"
2113            );
2114
2115            // Process SSE chunks for streaming token counting
2116            if let Some(ref mut counter) = ctx.inference_streaming_counter {
2117                let result = counter.process_chunk(chunk);
2118
2119                if result.content.is_some() || result.is_done {
2120                    trace!(
2121                        correlation_id = %ctx.trace_id,
2122                        has_content = result.content.is_some(),
2123                        is_done = result.is_done,
2124                        chunks_processed = counter.chunks_processed(),
2125                        accumulated_content_len = counter.content().len(),
2126                        "Processed SSE chunk for token counting"
2127                    );
2128                }
2129            }
2130
2131            // Response body inspection (buffered mode only)
2132            // Note: Streaming mode for response bodies is not currently supported
2133            // due to Pingora's synchronous response_body_filter design
2134            if ctx.response_body_inspection_enabled
2135                && !ctx.response_body_inspection_agents.is_empty()
2136            {
2137                let config = ctx
2138                    .config
2139                    .get_or_insert_with(|| self.config_manager.current());
2140                let max_inspection_bytes = config
2141                    .waf
2142                    .as_ref()
2143                    .map(|w| w.body_inspection.max_inspection_bytes as u64)
2144                    .unwrap_or(1024 * 1024);
2145
2146                if ctx.response_body_bytes_inspected < max_inspection_bytes {
2147                    let bytes_to_inspect = std::cmp::min(
2148                        chunk.len() as u64,
2149                        max_inspection_bytes - ctx.response_body_bytes_inspected,
2150                    ) as usize;
2151
2152                    // Buffer for later processing (during logging phase)
2153                    // Response body inspection happens asynchronously and results
2154                    // are logged rather than blocking the response
2155                    ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
2156                    ctx.response_body_chunk_index += 1;
2157
2158                    trace!(
2159                        correlation_id = %ctx.trace_id,
2160                        bytes_inspected = ctx.response_body_bytes_inspected,
2161                        max_inspection_bytes = max_inspection_bytes,
2162                        chunk_index = ctx.response_body_chunk_index,
2163                        "Tracking response body for inspection"
2164                    );
2165                }
2166            }
2167        }
2168
2169        if end_of_stream {
2170            trace!(
2171                correlation_id = %ctx.trace_id,
2172                total_response_bytes = ctx.response_bytes,
2173                response_bytes_inspected = ctx.response_body_bytes_inspected,
2174                "Response body complete"
2175            );
2176        }
2177
2178        // Return None to indicate no delay needed
2179        Ok(None)
2180    }
2181
2182    /// Called when a connection to upstream is established or reused.
2183    /// Logs connection reuse statistics for observability.
2184    async fn connected_to_upstream(
2185        &self,
2186        _session: &mut Session,
2187        reused: bool,
2188        peer: &HttpPeer,
2189        #[cfg(unix)] _fd: RawFd,
2190        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
2191        digest: Option<&Digest>,
2192        ctx: &mut Self::CTX,
2193    ) -> Result<(), Box<Error>> {
2194        // Track connection reuse for metrics
2195        ctx.connection_reused = reused;
2196
2197        // Log connection establishment/reuse
2198        if reused {
2199            trace!(
2200                correlation_id = %ctx.trace_id,
2201                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2202                peer_address = %peer.address(),
2203                "Reusing existing upstream connection"
2204            );
2205        } else {
2206            debug!(
2207                correlation_id = %ctx.trace_id,
2208                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2209                peer_address = %peer.address(),
2210                ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
2211                "Established new upstream connection"
2212            );
2213        }
2214
2215        Ok(())
2216    }
2217
2218    // =========================================================================
2219    // HTTP Caching - Pingora Cache Integration
2220    // =========================================================================
2221
2222    /// Decide if the request should use caching.
2223    ///
2224    /// This method is called early in the request lifecycle to determine if
2225    /// the response should be served from cache or if the response should
2226    /// be cached.
2227    fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
2228        // Check if route has caching enabled
2229        let route_id = match ctx.route_id.as_deref() {
2230            Some(id) => id,
2231            None => {
2232                trace!(
2233                    correlation_id = %ctx.trace_id,
2234                    "Cache filter: no route ID, skipping cache"
2235                );
2236                return Ok(());
2237            }
2238        };
2239
2240        // Check if caching is enabled for this route
2241        if !self.cache_manager.is_enabled(route_id) {
2242            trace!(
2243                correlation_id = %ctx.trace_id,
2244                route_id = %route_id,
2245                "Cache disabled for route"
2246            );
2247            return Ok(());
2248        }
2249
2250        // Check if method is cacheable (typically GET/HEAD)
2251        if !self
2252            .cache_manager
2253            .is_method_cacheable(route_id, &ctx.method)
2254        {
2255            trace!(
2256                correlation_id = %ctx.trace_id,
2257                route_id = %route_id,
2258                method = %ctx.method,
2259                "Method not cacheable"
2260            );
2261            return Ok(());
2262        }
2263
2264        // Enable caching for this request using Pingora's cache infrastructure
2265        debug!(
2266            correlation_id = %ctx.trace_id,
2267            route_id = %route_id,
2268            method = %ctx.method,
2269            path = %ctx.path,
2270            "Enabling HTTP caching for request"
2271        );
2272
2273        // Get static references to cache infrastructure
2274        let storage = get_cache_storage();
2275        let eviction = get_cache_eviction();
2276        let cache_lock = get_cache_lock();
2277
2278        // Enable the cache with storage, eviction, and lock
2279        session.cache.enable(
2280            storage,
2281            Some(eviction),
2282            None, // predictor - optional
2283            Some(cache_lock),
2284            None, // option overrides
2285        );
2286
2287        // Mark request as cache-eligible in context
2288        ctx.cache_eligible = true;
2289
2290        trace!(
2291            correlation_id = %ctx.trace_id,
2292            route_id = %route_id,
2293            cache_enabled = session.cache.enabled(),
2294            "Cache enabled for request"
2295        );
2296
2297        Ok(())
2298    }
2299
2300    /// Generate the cache key for this request.
2301    ///
2302    /// The cache key uniquely identifies the cached response. It typically
2303    /// includes the method, host, path, and potentially query parameters.
2304    fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
2305        let req_header = session.req_header();
2306        let method = req_header.method.as_str();
2307        let path = req_header.uri.path();
2308        let host = ctx.host.as_deref().unwrap_or("unknown");
2309        let query = req_header.uri.query();
2310
2311        // Generate cache key using our cache manager
2312        let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2313
2314        trace!(
2315            correlation_id = %ctx.trace_id,
2316            cache_key = %key_string,
2317            "Generated cache key"
2318        );
2319
2320        // Use Pingora's default cache key generator which handles
2321        // proper hashing and internal format
2322        Ok(CacheKey::default(req_header))
2323    }
2324
2325    /// Called when a cache miss occurs.
2326    ///
2327    /// This is called when the cache lookup found no matching entry.
2328    /// We can use this to log and track cache misses.
2329    fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
2330        // Let Pingora handle the cache miss
2331        session.cache.cache_miss();
2332
2333        // Track statistics
2334        if let Some(route_id) = ctx.route_id.as_deref() {
2335            self.cache_manager.stats().record_miss();
2336
2337            trace!(
2338                correlation_id = %ctx.trace_id,
2339                route_id = %route_id,
2340                path = %ctx.path,
2341                "Cache miss"
2342            );
2343        }
2344    }
2345
2346    /// Called after a successful cache lookup.
2347    ///
2348    /// This filter allows inspecting the cached response before serving it.
2349    /// Returns `None` to serve the cached response, or a `ForcedFreshness`
2350    /// to override the freshness decision.
2351    async fn cache_hit_filter(
2352        &self,
2353        session: &mut Session,
2354        meta: &CacheMeta,
2355        _hit_handler: &mut HitHandler,
2356        is_fresh: bool,
2357        ctx: &mut Self::CTX,
2358    ) -> Result<Option<ForcedFreshness>>
2359    where
2360        Self::CTX: Send + Sync,
2361    {
2362        // Check if this cache entry should be invalidated due to a purge request
2363        let req_header = session.req_header();
2364        let method = req_header.method.as_str();
2365        let path = req_header.uri.path();
2366        let host = req_header.uri.host().unwrap_or("localhost");
2367        let query = req_header.uri.query();
2368
2369        // Generate the cache key for this request
2370        let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2371
2372        // Check if this key should be invalidated
2373        if self.cache_manager.should_invalidate(&cache_key) {
2374            info!(
2375                correlation_id = %ctx.trace_id,
2376                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2377                cache_key = %cache_key,
2378                "Cache entry invalidated by purge request"
2379            );
2380            // Force expiration so the entry is refetched from upstream
2381            return Ok(Some(ForcedFreshness::ForceExpired));
2382        }
2383
2384        // Track cache hit statistics
2385        if is_fresh {
2386            self.cache_manager.stats().record_hit();
2387
2388            debug!(
2389                correlation_id = %ctx.trace_id,
2390                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2391                is_fresh = is_fresh,
2392                "Cache hit (fresh)"
2393            );
2394        } else {
2395            trace!(
2396                correlation_id = %ctx.trace_id,
2397                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2398                is_fresh = is_fresh,
2399                "Cache hit (stale)"
2400            );
2401        }
2402
2403        // Serve the cached response without invalidation
2404        Ok(None)
2405    }
2406
2407    /// Decide if the response should be cached.
2408    ///
2409    /// Called after receiving the response from upstream to determine
2410    /// if it should be stored in the cache.
2411    fn response_cache_filter(
2412        &self,
2413        _session: &Session,
2414        resp: &ResponseHeader,
2415        ctx: &mut Self::CTX,
2416    ) -> Result<RespCacheable> {
2417        let route_id = match ctx.route_id.as_deref() {
2418            Some(id) => id,
2419            None => {
2420                return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2421                    "no_route",
2422                )));
2423            }
2424        };
2425
2426        // Check if caching is enabled for this route
2427        if !self.cache_manager.is_enabled(route_id) {
2428            return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2429                "disabled",
2430            )));
2431        }
2432
2433        let status = resp.status.as_u16();
2434
2435        // Check if status code is cacheable
2436        if !self.cache_manager.is_status_cacheable(route_id, status) {
2437            trace!(
2438                correlation_id = %ctx.trace_id,
2439                route_id = %route_id,
2440                status = status,
2441                "Status code not cacheable"
2442            );
2443            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2444        }
2445
2446        // Check Cache-Control header for no-store, no-cache, private
2447        if let Some(cache_control) = resp.headers.get("cache-control") {
2448            if let Ok(cc_str) = cache_control.to_str() {
2449                if crate::cache::CacheManager::is_no_cache(cc_str) {
2450                    trace!(
2451                        correlation_id = %ctx.trace_id,
2452                        route_id = %route_id,
2453                        cache_control = %cc_str,
2454                        "Response has no-cache directive"
2455                    );
2456                    return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2457                }
2458            }
2459        }
2460
2461        // Calculate TTL from Cache-Control or use default
2462        let cache_control = resp
2463            .headers
2464            .get("cache-control")
2465            .and_then(|v| v.to_str().ok());
2466        let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
2467
2468        if ttl.is_zero() {
2469            trace!(
2470                correlation_id = %ctx.trace_id,
2471                route_id = %route_id,
2472                "TTL is zero, not caching"
2473            );
2474            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2475        }
2476
2477        // Get route cache config for stale settings
2478        let config = self
2479            .cache_manager
2480            .get_route_config(route_id)
2481            .unwrap_or_default();
2482
2483        // Create timestamps for cache metadata
2484        let now = std::time::SystemTime::now();
2485        let fresh_until = now + ttl;
2486
2487        // Clone the response header for storage
2488        let header = resp.clone();
2489
2490        // Create CacheMeta with proper timestamps and TTLs
2491        let cache_meta = CacheMeta::new(
2492            fresh_until,
2493            now,
2494            config.stale_while_revalidate_secs as u32,
2495            config.stale_if_error_secs as u32,
2496            header,
2497        );
2498
2499        // Track the cache store
2500        self.cache_manager.stats().record_store();
2501
2502        debug!(
2503            correlation_id = %ctx.trace_id,
2504            route_id = %route_id,
2505            status = status,
2506            ttl_secs = ttl.as_secs(),
2507            stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2508            stale_if_error_secs = config.stale_if_error_secs,
2509            "Caching response"
2510        );
2511
2512        Ok(RespCacheable::Cacheable(cache_meta))
2513    }
2514
2515    /// Decide whether to serve stale content on error or during revalidation.
2516    ///
2517    /// This implements stale-while-revalidate and stale-if-error semantics.
2518    fn should_serve_stale(
2519        &self,
2520        _session: &mut Session,
2521        ctx: &mut Self::CTX,
2522        error: Option<&Error>,
2523    ) -> bool {
2524        let route_id = match ctx.route_id.as_deref() {
2525            Some(id) => id,
2526            None => return false,
2527        };
2528
2529        // Get route cache config for stale settings
2530        let config = match self.cache_manager.get_route_config(route_id) {
2531            Some(c) => c,
2532            None => return false,
2533        };
2534
2535        // If there's an upstream error, use stale-if-error
2536        if let Some(e) = error {
2537            // Only serve stale for upstream errors
2538            if e.esource() == &pingora::ErrorSource::Upstream {
2539                debug!(
2540                    correlation_id = %ctx.trace_id,
2541                    route_id = %route_id,
2542                    error = %e,
2543                    stale_if_error_secs = config.stale_if_error_secs,
2544                    "Considering stale-if-error"
2545                );
2546                return config.stale_if_error_secs > 0;
2547            }
2548        }
2549
2550        // During stale-while-revalidate (error is None)
2551        if error.is_none() && config.stale_while_revalidate_secs > 0 {
2552            trace!(
2553                correlation_id = %ctx.trace_id,
2554                route_id = %route_id,
2555                stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2556                "Allowing stale-while-revalidate"
2557            );
2558            return true;
2559        }
2560
2561        false
2562    }
2563
2564    /// Handle Range header for byte-range requests (streaming support).
2565    ///
2566    /// This method is called when a Range header is present in the request.
2567    /// It allows proper handling of:
2568    /// - Video streaming (HTML5 video seeking)
2569    /// - Large file downloads with resume support
2570    /// - Partial content delivery
2571    ///
2572    /// Uses Pingora's built-in range handling with route-specific logging.
2573    fn range_header_filter(
2574        &self,
2575        session: &mut Session,
2576        response: &mut ResponseHeader,
2577        ctx: &mut Self::CTX,
2578    ) -> pingora_proxy::RangeType
2579    where
2580        Self::CTX: Send + Sync,
2581    {
2582        // Check if route supports range requests
2583        let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
2584            // Static file routes and media routes should support range requests
2585            matches!(
2586                config.service_type,
2587                sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
2588            )
2589        });
2590
2591        if !supports_range {
2592            trace!(
2593                correlation_id = %ctx.trace_id,
2594                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2595                "Range request not supported for this route type"
2596            );
2597            return pingora_proxy::RangeType::None;
2598        }
2599
2600        // Use Pingora's built-in range header parsing and handling
2601        let range_type = pingora_proxy::range_header_filter(session.req_header(), response, None);
2602
2603        match &range_type {
2604            pingora_proxy::RangeType::None => {
2605                trace!(
2606                    correlation_id = %ctx.trace_id,
2607                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2608                    "No range request or not applicable"
2609                );
2610            }
2611            pingora_proxy::RangeType::Single(range) => {
2612                trace!(
2613                    correlation_id = %ctx.trace_id,
2614                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2615                    range_start = range.start,
2616                    range_end = range.end,
2617                    "Processing single-range request"
2618                );
2619            }
2620            pingora_proxy::RangeType::Multi(multi) => {
2621                trace!(
2622                    correlation_id = %ctx.trace_id,
2623                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2624                    range_count = multi.ranges.len(),
2625                    "Processing multi-range request"
2626                );
2627            }
2628            pingora_proxy::RangeType::Invalid => {
2629                debug!(
2630                    correlation_id = %ctx.trace_id,
2631                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2632                    "Invalid range header"
2633                );
2634            }
2635        }
2636
2637        range_type
2638    }
2639
2640    /// Handle fatal proxy errors by generating custom error pages.
2641    /// Called when the proxy itself fails to process the request.
2642    async fn fail_to_proxy(
2643        &self,
2644        session: &mut Session,
2645        e: &Error,
2646        ctx: &mut Self::CTX,
2647    ) -> pingora_proxy::FailToProxy
2648    where
2649        Self::CTX: Send + Sync,
2650    {
2651        let error_code = match e.etype() {
2652            // Connection errors
2653            ErrorType::ConnectRefused => 503,
2654            ErrorType::ConnectTimedout => 504,
2655            ErrorType::ConnectNoRoute => 502,
2656
2657            // Timeout errors
2658            ErrorType::ReadTimedout => 504,
2659            ErrorType::WriteTimedout => 504,
2660
2661            // TLS errors
2662            ErrorType::TLSHandshakeFailure => 502,
2663            ErrorType::InvalidCert => 502,
2664
2665            // Protocol errors
2666            ErrorType::InvalidHTTPHeader => 400,
2667            ErrorType::H2Error => 502,
2668
2669            // Resource errors
2670            ErrorType::ConnectProxyFailure => 502,
2671            ErrorType::ConnectionClosed => 502,
2672
2673            // Explicit HTTP status (e.g., from agent fail-closed blocking)
2674            ErrorType::HTTPStatus(status) => *status,
2675
2676            // Internal errors - return 502 for upstream issues (more accurate than 500)
2677            ErrorType::InternalError => {
2678                // Check if this is an upstream-related error
2679                let error_str = e.to_string();
2680                if error_str.contains("upstream")
2681                    || error_str.contains("DNS")
2682                    || error_str.contains("resolve")
2683                {
2684                    502
2685                } else {
2686                    500
2687                }
2688            }
2689
2690            // Default to 502 for unknown errors
2691            _ => 502,
2692        };
2693
2694        error!(
2695            correlation_id = %ctx.trace_id,
2696            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2697            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2698            error_type = ?e.etype(),
2699            error = %e,
2700            error_code = error_code,
2701            "Proxy error occurred"
2702        );
2703
2704        // Record the error in metrics
2705        self.metrics
2706            .record_blocked_request(&format!("proxy_error_{}", error_code));
2707
2708        // Write error response to ensure client receives a proper HTTP response
2709        // This is necessary because some errors occur before the upstream connection
2710        // is established, and Pingora may not send a response automatically
2711        let error_message = match error_code {
2712            400 => "Bad Request",
2713            502 => "Bad Gateway",
2714            503 => "Service Unavailable",
2715            504 => "Gateway Timeout",
2716            _ => "Internal Server Error",
2717        };
2718
2719        // Build a minimal error response body
2720        let body = format!(
2721            r#"{{"error":"{} {}","trace_id":"{}"}}"#,
2722            error_code, error_message, ctx.trace_id
2723        );
2724
2725        // Write the response header
2726        let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
2727        header
2728            .insert_header("Content-Type", "application/json")
2729            .ok();
2730        header
2731            .insert_header("Content-Length", body.len().to_string())
2732            .ok();
2733        header
2734            .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
2735            .ok();
2736        header.insert_header("Connection", "close").ok();
2737
2738        // Write headers and body
2739        if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
2740            warn!(
2741                correlation_id = %ctx.trace_id,
2742                error = %write_err,
2743                "Failed to write error response header"
2744            );
2745        } else {
2746            // Write the body
2747            if let Err(write_err) = session
2748                .write_response_body(Some(bytes::Bytes::from(body)), true)
2749                .await
2750            {
2751                warn!(
2752                    correlation_id = %ctx.trace_id,
2753                    error = %write_err,
2754                    "Failed to write error response body"
2755                );
2756            }
2757        }
2758
2759        // Return the error response info
2760        // can_reuse_downstream: false since we already wrote and closed the response
2761        pingora_proxy::FailToProxy {
2762            error_code,
2763            can_reuse_downstream: false,
2764        }
2765    }
2766
2767    /// Handle errors that occur during proxying after upstream connection is established.
2768    ///
2769    /// This method enables retry logic and circuit breaker integration.
2770    /// It's called when an error occurs during the request/response exchange
2771    /// with the upstream server.
2772    fn error_while_proxy(
2773        &self,
2774        peer: &HttpPeer,
2775        session: &mut Session,
2776        e: Box<Error>,
2777        ctx: &mut Self::CTX,
2778        client_reused: bool,
2779    ) -> Box<Error> {
2780        let error_type = e.etype().clone();
2781        let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
2782
2783        // Classify error for retry decisions
2784        let is_retryable = matches!(
2785            error_type,
2786            ErrorType::ConnectTimedout
2787                | ErrorType::ReadTimedout
2788                | ErrorType::WriteTimedout
2789                | ErrorType::ConnectionClosed
2790                | ErrorType::ConnectRefused
2791        );
2792
2793        // Log the error with context
2794        warn!(
2795            correlation_id = %ctx.trace_id,
2796            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2797            upstream = %upstream_id,
2798            peer_address = %peer.address(),
2799            error_type = ?error_type,
2800            error = %e,
2801            client_reused = client_reused,
2802            is_retryable = is_retryable,
2803            "Error during proxy operation"
2804        );
2805
2806        // Record failure with circuit breaker via upstream pool
2807        // This is done asynchronously since we can't await in a sync fn
2808        let peer_address = peer.address().to_string();
2809        let upstream_pools = self.upstream_pools.clone();
2810        let upstream_id_owned = upstream_id.to_string();
2811        tokio::spawn(async move {
2812            if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
2813                pool.report_result(&peer_address, false).await;
2814            }
2815        });
2816
2817        // Metrics tracking
2818        self.metrics
2819            .record_blocked_request(&format!("proxy_error_{:?}", error_type));
2820
2821        // Create enhanced error with retry information
2822        let mut enhanced_error = e.more_context(format!(
2823            "Upstream: {}, Peer: {}, Attempts: {}",
2824            upstream_id,
2825            peer.address(),
2826            ctx.upstream_attempts
2827        ));
2828
2829        // Determine if retry should be attempted:
2830        // - Only retry if error is retryable type
2831        // - Only retry reused connections if buffer isn't truncated
2832        // - Track retry metrics
2833        if is_retryable {
2834            let can_retry = if client_reused {
2835                // For reused connections, check if retry buffer is intact
2836                !session.as_ref().retry_buffer_truncated()
2837            } else {
2838                // Fresh connections can always retry
2839                true
2840            };
2841
2842            enhanced_error.retry.decide_reuse(can_retry);
2843
2844            if can_retry {
2845                debug!(
2846                    correlation_id = %ctx.trace_id,
2847                    upstream = %upstream_id,
2848                    error_type = ?error_type,
2849                    "Error is retryable, will attempt retry"
2850                );
2851            }
2852        } else {
2853            // Non-retryable error - don't retry
2854            enhanced_error.retry.decide_reuse(false);
2855        }
2856
2857        enhanced_error
2858    }
2859
2860    async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
2861        // Decrement active requests
2862        self.reload_coordinator.dec_requests();
2863
2864        // === Fire pending shadow request (if body buffering was enabled) ===
2865        if !ctx.shadow_sent {
2866            if let Some(shadow_pending) = ctx.shadow_pending.take() {
2867                let body = if shadow_pending.include_body && !ctx.body_buffer.is_empty() {
2868                    // Clone the buffered body for the shadow request
2869                    Some(ctx.body_buffer.clone())
2870                } else {
2871                    None
2872                };
2873
2874                trace!(
2875                    correlation_id = %ctx.trace_id,
2876                    body_size = body.as_ref().map(|b| b.len()).unwrap_or(0),
2877                    "Firing deferred shadow request with buffered body"
2878                );
2879
2880                shadow_pending.manager.shadow_request(
2881                    shadow_pending.headers,
2882                    body,
2883                    shadow_pending.request_ctx,
2884                );
2885                ctx.shadow_sent = true;
2886            }
2887        }
2888
2889        let duration = ctx.elapsed();
2890
2891        // Get response status
2892        let status = session
2893            .response_written()
2894            .map(|r| r.status.as_u16())
2895            .unwrap_or(0);
2896
2897        // Report result to load balancer for adaptive LB feedback
2898        // This enables latency-aware weight adjustment
2899        if let (Some(ref peer_addr), Some(ref upstream_id)) =
2900            (&ctx.selected_upstream_address, &ctx.upstream)
2901        {
2902            // Success = status code < 500 (client errors are not upstream failures)
2903            let success = status > 0 && status < 500;
2904
2905            if let Some(pool) = self.upstream_pools.get(upstream_id).await {
2906                pool.report_result_with_latency(peer_addr, success, Some(duration))
2907                    .await;
2908                trace!(
2909                    correlation_id = %ctx.trace_id,
2910                    upstream = %upstream_id,
2911                    peer_address = %peer_addr,
2912                    success = success,
2913                    duration_ms = duration.as_millis(),
2914                    status = status,
2915                    "Reported result to adaptive load balancer"
2916                );
2917            }
2918
2919            // Track warmth for inference routes (cold model detection)
2920            if ctx.inference_rate_limit_enabled && success {
2921                let cold_detected = self.warmth_tracker.record_request(peer_addr, duration);
2922                if cold_detected {
2923                    debug!(
2924                        correlation_id = %ctx.trace_id,
2925                        upstream = %upstream_id,
2926                        peer_address = %peer_addr,
2927                        duration_ms = duration.as_millis(),
2928                        "Cold model detected on inference upstream"
2929                    );
2930                }
2931            }
2932        }
2933
2934        // Record actual token usage for inference rate limiting
2935        // This adjusts the token bucket based on actual vs estimated tokens
2936        if ctx.inference_rate_limit_enabled {
2937            if let (Some(route_id), Some(ref rate_limit_key)) =
2938                (ctx.route_id.as_deref(), &ctx.inference_rate_limit_key)
2939            {
2940                // Try to extract actual tokens from response headers
2941                let response_headers = session
2942                    .response_written()
2943                    .map(|r| &r.headers)
2944                    .cloned()
2945                    .unwrap_or_default();
2946
2947                // For streaming responses, finalize the streaming token counter
2948                let streaming_result = if ctx.inference_streaming_response {
2949                    ctx.inference_streaming_counter
2950                        .as_ref()
2951                        .map(|counter| counter.finalize())
2952                } else {
2953                    None
2954                };
2955
2956                // Log streaming token count info
2957                if let Some(ref result) = streaming_result {
2958                    debug!(
2959                        correlation_id = %ctx.trace_id,
2960                        output_tokens = result.output_tokens,
2961                        input_tokens = ?result.input_tokens,
2962                        source = ?result.source,
2963                        content_length = result.content_length,
2964                        "Finalized streaming token count"
2965                    );
2966                }
2967
2968                // PII detection guardrail (for streaming inference responses)
2969                if ctx.inference_streaming_response {
2970                    if let Some(ref route_config) = ctx.route_config {
2971                        if let Some(ref inference) = route_config.inference {
2972                            if let Some(ref guardrails) = inference.guardrails {
2973                                if let Some(ref pii_config) = guardrails.pii_detection {
2974                                    if pii_config.enabled {
2975                                        // Get accumulated content from streaming counter
2976                                        if let Some(ref counter) = ctx.inference_streaming_counter {
2977                                            let response_content = counter.content();
2978                                            if !response_content.is_empty() {
2979                                                let pii_result = self
2980                                                    .guardrail_processor
2981                                                    .check_pii(
2982                                                        pii_config,
2983                                                        response_content,
2984                                                        ctx.route_id.as_deref(),
2985                                                        &ctx.trace_id,
2986                                                    )
2987                                                    .await;
2988
2989                                                match pii_result {
2990                                                    crate::inference::PiiCheckResult::Detected {
2991                                                        detections,
2992                                                        redacted_content: _,
2993                                                    } => {
2994                                                        warn!(
2995                                                            correlation_id = %ctx.trace_id,
2996                                                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2997                                                            detection_count = detections.len(),
2998                                                            "PII detected in inference response"
2999                                                        );
3000
3001                                                        // Store detection categories for logging
3002                                                        ctx.pii_detection_categories = detections
3003                                                            .iter()
3004                                                            .map(|d| d.category.clone())
3005                                                            .collect();
3006
3007                                                        // Record metrics for each category
3008                                                        for detection in &detections {
3009                                                            self.metrics.record_pii_detected(
3010                                                                ctx.route_id.as_deref().unwrap_or("unknown"),
3011                                                                &detection.category,
3012                                                            );
3013                                                        }
3014                                                    }
3015                                                    crate::inference::PiiCheckResult::Clean => {
3016                                                        trace!(
3017                                                            correlation_id = %ctx.trace_id,
3018                                                            "No PII detected in response"
3019                                                        );
3020                                                    }
3021                                                    crate::inference::PiiCheckResult::Error { message } => {
3022                                                        debug!(
3023                                                            correlation_id = %ctx.trace_id,
3024                                                            error = %message,
3025                                                            "PII detection check failed"
3026                                                        );
3027                                                    }
3028                                                }
3029                                            }
3030                                        }
3031                                    }
3032                                }
3033                            }
3034                        }
3035                    }
3036                }
3037
3038                // Response body would require buffering, which is expensive
3039                // For non-streaming, most LLM APIs provide token counts in headers
3040                // For streaming, we use the accumulated SSE content
3041                let empty_body: &[u8] = &[];
3042
3043                if let Some(actual_estimate) = self.inference_rate_limit_manager.record_actual(
3044                    route_id,
3045                    rate_limit_key,
3046                    &response_headers,
3047                    empty_body,
3048                    ctx.inference_estimated_tokens,
3049                ) {
3050                    // Use streaming result if available and header extraction failed
3051                    let (actual_tokens, source_info) = if let Some(ref streaming) = streaming_result
3052                    {
3053                        // Prefer API-provided counts from streaming, otherwise use tiktoken count
3054                        if streaming.total_tokens.is_some() {
3055                            (streaming.total_tokens.unwrap(), "streaming_api")
3056                        } else if actual_estimate.source == crate::inference::TokenSource::Estimated
3057                        {
3058                            // Header extraction failed, use streaming tiktoken count
3059                            // Estimate total by adding input estimate + output from streaming
3060                            let total = ctx.inference_input_tokens + streaming.output_tokens;
3061                            (total, "streaming_tiktoken")
3062                        } else {
3063                            (actual_estimate.tokens, "headers")
3064                        }
3065                    } else {
3066                        (actual_estimate.tokens, "headers")
3067                    };
3068
3069                    ctx.inference_actual_tokens = Some(actual_tokens);
3070
3071                    debug!(
3072                        correlation_id = %ctx.trace_id,
3073                        route_id = route_id,
3074                        estimated_tokens = ctx.inference_estimated_tokens,
3075                        actual_tokens = actual_tokens,
3076                        source = source_info,
3077                        streaming_response = ctx.inference_streaming_response,
3078                        model = ?ctx.inference_model,
3079                        "Recorded actual inference tokens"
3080                    );
3081
3082                    // Record budget usage with actual tokens (if budget tracking enabled)
3083                    if ctx.inference_budget_enabled {
3084                        let alerts = self.inference_rate_limit_manager.record_budget(
3085                            route_id,
3086                            rate_limit_key,
3087                            actual_tokens,
3088                        );
3089
3090                        // Log any budget alerts that fired
3091                        for alert in alerts.iter() {
3092                            warn!(
3093                                correlation_id = %ctx.trace_id,
3094                                route_id = route_id,
3095                                tenant = %alert.tenant,
3096                                threshold_pct = alert.threshold * 100.0,
3097                                tokens_used = alert.tokens_used,
3098                                tokens_limit = alert.tokens_limit,
3099                                "Token budget alert threshold crossed"
3100                            );
3101                        }
3102
3103                        // Update context with remaining budget
3104                        if let Some(status) = self
3105                            .inference_rate_limit_manager
3106                            .budget_status(route_id, rate_limit_key)
3107                        {
3108                            ctx.inference_budget_remaining = Some(status.tokens_remaining as i64);
3109                        }
3110                    }
3111
3112                    // Calculate cost if cost attribution is enabled
3113                    if ctx.inference_cost_enabled {
3114                        if let Some(model) = ctx.inference_model.as_deref() {
3115                            // Use streaming result for more accurate input/output split if available
3116                            let (input_tokens, output_tokens) = if let Some(ref streaming) =
3117                                streaming_result
3118                            {
3119                                // Streaming gives us accurate output tokens
3120                                let input =
3121                                    streaming.input_tokens.unwrap_or(ctx.inference_input_tokens);
3122                                let output = streaming.output_tokens;
3123                                (input, output)
3124                            } else {
3125                                // Fallback: estimate output from total - input
3126                                let input = ctx.inference_input_tokens;
3127                                let output = actual_tokens.saturating_sub(input);
3128                                (input, output)
3129                            };
3130                            ctx.inference_output_tokens = output_tokens;
3131
3132                            if let Some(cost_result) = self
3133                                .inference_rate_limit_manager
3134                                .calculate_cost(route_id, model, input_tokens, output_tokens)
3135                            {
3136                                ctx.inference_request_cost = Some(cost_result.total_cost);
3137
3138                                trace!(
3139                                    correlation_id = %ctx.trace_id,
3140                                    route_id = route_id,
3141                                    model = model,
3142                                    input_tokens = input_tokens,
3143                                    output_tokens = output_tokens,
3144                                    total_cost = cost_result.total_cost,
3145                                    currency = %cost_result.currency,
3146                                    "Calculated inference request cost"
3147                                );
3148                            }
3149                        }
3150                    }
3151                }
3152            }
3153        }
3154
3155        // Write to access log file if configured (check sampling before allocating entry)
3156        if self.log_manager.should_log_access(status) {
3157            let access_entry = AccessLogEntry {
3158                timestamp: chrono::Utc::now().to_rfc3339(),
3159                trace_id: ctx.trace_id.clone(),
3160                method: ctx.method.clone(),
3161                path: ctx.path.clone(),
3162                query: ctx.query.clone(),
3163                protocol: "HTTP/1.1".to_string(),
3164                status,
3165                body_bytes: ctx.response_bytes,
3166                duration_ms: duration.as_millis() as u64,
3167                client_ip: ctx.client_ip.clone(),
3168                user_agent: ctx.user_agent.clone(),
3169                referer: ctx.referer.clone(),
3170                host: ctx.host.clone(),
3171                route_id: ctx.route_id.clone(),
3172                upstream: ctx.upstream.clone(),
3173                upstream_attempts: ctx.upstream_attempts,
3174                instance_id: self.app_state.instance_id.clone(),
3175                namespace: ctx.namespace.clone(),
3176                service: ctx.service.clone(),
3177                // New fields
3178                body_bytes_sent: ctx.response_bytes,
3179                upstream_addr: ctx.selected_upstream_address.clone(),
3180                connection_reused: ctx.connection_reused,
3181                rate_limit_hit: status == 429,
3182                geo_country: ctx.geo_country_code.clone(),
3183            };
3184            self.log_manager.log_access(&access_entry);
3185        }
3186
3187        // Log to tracing at debug level (avoid allocations if debug disabled)
3188        if tracing::enabled!(tracing::Level::DEBUG) {
3189            debug!(
3190                trace_id = %ctx.trace_id,
3191                method = %ctx.method,
3192                path = %ctx.path,
3193                route_id = ?ctx.route_id,
3194                upstream = ?ctx.upstream,
3195                status = status,
3196                duration_ms = duration.as_millis() as u64,
3197                upstream_attempts = ctx.upstream_attempts,
3198                error = ?_error.map(|e| e.to_string()),
3199                "Request completed"
3200            );
3201        }
3202
3203        // Log WebSocket upgrades at info level
3204        if ctx.is_websocket_upgrade && status == 101 {
3205            info!(
3206                trace_id = %ctx.trace_id,
3207                route_id = ?ctx.route_id,
3208                upstream = ?ctx.upstream,
3209                client_ip = %ctx.client_ip,
3210                "WebSocket connection established"
3211            );
3212        }
3213
3214        // End OpenTelemetry span
3215        if let Some(span) = ctx.otel_span.take() {
3216            span.end();
3217        }
3218    }
3219}
3220
3221// =============================================================================
3222// Helper methods for body streaming (not part of ProxyHttp trait)
3223// =============================================================================
3224
3225impl SentinelProxy {
3226    /// Process a single body chunk in streaming mode.
3227    async fn process_body_chunk_streaming(
3228        &self,
3229        body: &mut Option<Bytes>,
3230        end_of_stream: bool,
3231        ctx: &mut RequestContext,
3232    ) -> Result<(), Box<Error>> {
3233        // Clone the chunk data to avoid borrowing issues when mutating body later
3234        let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
3235        let chunk_index = ctx.request_body_chunk_index;
3236        ctx.request_body_chunk_index += 1;
3237        ctx.body_bytes_inspected += chunk_data.len() as u64;
3238
3239        debug!(
3240            correlation_id = %ctx.trace_id,
3241            chunk_index = chunk_index,
3242            chunk_size = chunk_data.len(),
3243            end_of_stream = end_of_stream,
3244            "Streaming body chunk to agents"
3245        );
3246
3247        // Create agent call context
3248        let agent_ctx = crate::agents::AgentCallContext {
3249            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3250            metadata: sentinel_agent_protocol::RequestMetadata {
3251                correlation_id: ctx.trace_id.clone(),
3252                request_id: ctx.trace_id.clone(),
3253                client_ip: ctx.client_ip.clone(),
3254                client_port: 0,
3255                server_name: ctx.host.clone(),
3256                protocol: "HTTP/1.1".to_string(),
3257                tls_version: None,
3258                tls_cipher: None,
3259                route_id: ctx.route_id.clone(),
3260                upstream_id: ctx.upstream.clone(),
3261                timestamp: chrono::Utc::now().to_rfc3339(),
3262                traceparent: ctx.traceparent(),
3263            },
3264            route_id: ctx.route_id.clone(),
3265            upstream_id: ctx.upstream.clone(),
3266            request_body: None, // Not used in streaming mode
3267            response_body: None,
3268        };
3269
3270        let agent_ids = ctx.body_inspection_agents.clone();
3271        let total_size = None; // Unknown in streaming mode
3272
3273        match self
3274            .agent_manager
3275            .process_request_body_streaming(
3276                &agent_ctx,
3277                &chunk_data,
3278                end_of_stream,
3279                chunk_index,
3280                ctx.body_bytes_inspected as usize,
3281                total_size,
3282                &agent_ids,
3283            )
3284            .await
3285        {
3286            Ok(decision) => {
3287                // Track if agent needs more data
3288                ctx.agent_needs_more = decision.needs_more;
3289
3290                // Apply body mutation if present
3291                if let Some(ref mutation) = decision.request_body_mutation {
3292                    if !mutation.is_pass_through() {
3293                        if mutation.is_drop() {
3294                            // Drop the chunk
3295                            *body = None;
3296                            trace!(
3297                                correlation_id = %ctx.trace_id,
3298                                chunk_index = chunk_index,
3299                                "Agent dropped body chunk"
3300                            );
3301                        } else if let Some(ref new_data) = mutation.data {
3302                            // Replace chunk with mutated content
3303                            *body = Some(Bytes::from(new_data.clone()));
3304                            trace!(
3305                                correlation_id = %ctx.trace_id,
3306                                chunk_index = chunk_index,
3307                                original_size = chunk_data.len(),
3308                                new_size = new_data.len(),
3309                                "Agent mutated body chunk"
3310                            );
3311                        }
3312                    }
3313                }
3314
3315                // Check decision (only final if needs_more is false)
3316                if !decision.needs_more && !decision.is_allow() {
3317                    warn!(
3318                        correlation_id = %ctx.trace_id,
3319                        action = ?decision.action,
3320                        "Agent blocked request body"
3321                    );
3322                    self.metrics.record_blocked_request("agent_body_inspection");
3323
3324                    let (status, message) = match &decision.action {
3325                        crate::agents::AgentAction::Block { status, body, .. } => (
3326                            *status,
3327                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
3328                        ),
3329                        _ => (403, "Forbidden".to_string()),
3330                    };
3331
3332                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3333                }
3334
3335                trace!(
3336                    correlation_id = %ctx.trace_id,
3337                    needs_more = decision.needs_more,
3338                    "Agent processed body chunk"
3339                );
3340            }
3341            Err(e) => {
3342                let fail_closed = ctx
3343                    .route_config
3344                    .as_ref()
3345                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3346                    .unwrap_or(false);
3347
3348                if fail_closed {
3349                    error!(
3350                        correlation_id = %ctx.trace_id,
3351                        error = %e,
3352                        "Agent streaming body inspection failed, blocking (fail-closed)"
3353                    );
3354                    return Err(Error::explain(
3355                        ErrorType::HTTPStatus(503),
3356                        "Service unavailable",
3357                    ));
3358                } else {
3359                    warn!(
3360                        correlation_id = %ctx.trace_id,
3361                        error = %e,
3362                        "Agent streaming body inspection failed, allowing (fail-open)"
3363                    );
3364                }
3365            }
3366        }
3367
3368        Ok(())
3369    }
3370
3371    /// Send buffered body to agents (buffer mode).
3372    async fn send_buffered_body_to_agents(
3373        &self,
3374        end_of_stream: bool,
3375        ctx: &mut RequestContext,
3376    ) -> Result<(), Box<Error>> {
3377        debug!(
3378            correlation_id = %ctx.trace_id,
3379            buffer_size = ctx.body_buffer.len(),
3380            end_of_stream = end_of_stream,
3381            agent_count = ctx.body_inspection_agents.len(),
3382            decompression_enabled = ctx.decompression_enabled,
3383            "Sending buffered body to agents for inspection"
3384        );
3385
3386        // Decompress body if enabled and we have a supported encoding
3387        let body_for_inspection = if ctx.decompression_enabled {
3388            if let Some(ref encoding) = ctx.body_content_encoding {
3389                let config = crate::decompression::DecompressionConfig {
3390                    max_ratio: ctx.max_decompression_ratio,
3391                    max_output_bytes: ctx.max_decompression_bytes,
3392                };
3393
3394                match crate::decompression::decompress_body(&ctx.body_buffer, encoding, &config) {
3395                    Ok(result) => {
3396                        ctx.body_was_decompressed = true;
3397                        self.metrics
3398                            .record_decompression_success(encoding, result.ratio);
3399                        debug!(
3400                            correlation_id = %ctx.trace_id,
3401                            encoding = %encoding,
3402                            compressed_size = result.compressed_size,
3403                            decompressed_size = result.decompressed_size,
3404                            ratio = result.ratio,
3405                            "Body decompressed for agent inspection"
3406                        );
3407                        result.data
3408                    }
3409                    Err(e) => {
3410                        // Record failure metric
3411                        let failure_reason = match &e {
3412                            crate::decompression::DecompressionError::RatioExceeded { .. } => {
3413                                "ratio_exceeded"
3414                            }
3415                            crate::decompression::DecompressionError::SizeExceeded { .. } => {
3416                                "size_exceeded"
3417                            }
3418                            crate::decompression::DecompressionError::InvalidData { .. } => {
3419                                "invalid_data"
3420                            }
3421                            crate::decompression::DecompressionError::UnsupportedEncoding {
3422                                ..
3423                            } => "unsupported",
3424                            crate::decompression::DecompressionError::IoError(_) => "io_error",
3425                        };
3426                        self.metrics
3427                            .record_decompression_failure(encoding, failure_reason);
3428
3429                        // Decompression failed - decide based on failure mode
3430                        let fail_closed = ctx
3431                            .route_config
3432                            .as_ref()
3433                            .map(|r| {
3434                                r.policies.failure_mode == sentinel_config::FailureMode::Closed
3435                            })
3436                            .unwrap_or(false);
3437
3438                        if fail_closed {
3439                            error!(
3440                                correlation_id = %ctx.trace_id,
3441                                error = %e,
3442                                encoding = %encoding,
3443                                "Decompression failed, blocking (fail-closed)"
3444                            );
3445                            return Err(Error::explain(
3446                                ErrorType::HTTPStatus(400),
3447                                "Invalid compressed body",
3448                            ));
3449                        } else {
3450                            warn!(
3451                                correlation_id = %ctx.trace_id,
3452                                error = %e,
3453                                encoding = %encoding,
3454                                "Decompression failed, sending compressed body (fail-open)"
3455                            );
3456                            ctx.body_buffer.clone()
3457                        }
3458                    }
3459                }
3460            } else {
3461                ctx.body_buffer.clone()
3462            }
3463        } else {
3464            ctx.body_buffer.clone()
3465        };
3466
3467        let agent_ctx = crate::agents::AgentCallContext {
3468            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3469            metadata: sentinel_agent_protocol::RequestMetadata {
3470                correlation_id: ctx.trace_id.clone(),
3471                request_id: ctx.trace_id.clone(),
3472                client_ip: ctx.client_ip.clone(),
3473                client_port: 0,
3474                server_name: ctx.host.clone(),
3475                protocol: "HTTP/1.1".to_string(),
3476                tls_version: None,
3477                tls_cipher: None,
3478                route_id: ctx.route_id.clone(),
3479                upstream_id: ctx.upstream.clone(),
3480                timestamp: chrono::Utc::now().to_rfc3339(),
3481                traceparent: ctx.traceparent(),
3482            },
3483            route_id: ctx.route_id.clone(),
3484            upstream_id: ctx.upstream.clone(),
3485            request_body: Some(body_for_inspection.clone()),
3486            response_body: None,
3487        };
3488
3489        let agent_ids = ctx.body_inspection_agents.clone();
3490        match self
3491            .agent_manager
3492            .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
3493            .await
3494        {
3495            Ok(decision) => {
3496                if !decision.is_allow() {
3497                    warn!(
3498                        correlation_id = %ctx.trace_id,
3499                        action = ?decision.action,
3500                        "Agent blocked request body"
3501                    );
3502                    self.metrics.record_blocked_request("agent_body_inspection");
3503
3504                    let (status, message) = match &decision.action {
3505                        crate::agents::AgentAction::Block { status, body, .. } => (
3506                            *status,
3507                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
3508                        ),
3509                        _ => (403, "Forbidden".to_string()),
3510                    };
3511
3512                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3513                }
3514
3515                trace!(
3516                    correlation_id = %ctx.trace_id,
3517                    "Agent allowed request body"
3518                );
3519            }
3520            Err(e) => {
3521                let fail_closed = ctx
3522                    .route_config
3523                    .as_ref()
3524                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3525                    .unwrap_or(false);
3526
3527                if fail_closed {
3528                    error!(
3529                        correlation_id = %ctx.trace_id,
3530                        error = %e,
3531                        "Agent body inspection failed, blocking (fail-closed)"
3532                    );
3533                    return Err(Error::explain(
3534                        ErrorType::HTTPStatus(503),
3535                        "Service unavailable",
3536                    ));
3537                } else {
3538                    warn!(
3539                        correlation_id = %ctx.trace_id,
3540                        error = %e,
3541                        "Agent body inspection failed, allowing (fail-open)"
3542                    );
3543                }
3544            }
3545        }
3546
3547        Ok(())
3548    }
3549}