sentinel_proxy/proxy/
http_trait.rs

1//! ProxyHttp trait implementation for SentinelProxy.
2//!
3//! This module contains the Pingora ProxyHttp trait implementation which defines
4//! the core request/response lifecycle handling.
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14    CacheKey, CacheMeta, ForcedInvalidationKind, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::inference::{
23    extract_inference_content, is_sse_response, PromptInjectionResult, StreamingTokenCounter,
24};
25use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
26use crate::rate_limit::HeaderAccessor;
27use crate::routing::RequestInfo;
28
29use super::context::{FallbackReason, RequestContext};
30use super::fallback::FallbackEvaluator;
31use super::fallback_metrics::get_fallback_metrics;
32use super::model_routing;
33use super::model_routing_metrics::get_model_routing_metrics;
34use super::SentinelProxy;
35
36/// Helper type for rate limiting when we don't need header access
37struct NoHeaderAccessor;
38impl HeaderAccessor for NoHeaderAccessor {
39    fn get_header(&self, _name: &str) -> Option<String> {
40        None
41    }
42}
43
44#[async_trait]
45impl ProxyHttp for SentinelProxy {
46    type CTX = RequestContext;
47
48    fn new_ctx(&self) -> Self::CTX {
49        RequestContext::new()
50    }
51
52    fn fail_to_connect(
53        &self,
54        _session: &mut Session,
55        peer: &HttpPeer,
56        ctx: &mut Self::CTX,
57        e: Box<Error>,
58    ) -> Box<Error> {
59        error!(
60            correlation_id = %ctx.trace_id,
61            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
62            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
63            peer_address = %peer.address(),
64            error = %e,
65            "Failed to connect to upstream peer"
66        );
67        // Custom error pages are handled in response_filter
68        e
69    }
70
71    /// Early request filter - runs before upstream selection
72    /// Used to handle builtin routes that don't need an upstream connection
73    async fn early_request_filter(
74        &self,
75        session: &mut Session,
76        ctx: &mut Self::CTX,
77    ) -> Result<(), Box<Error>> {
78        // Extract request info for routing
79        let req_header = session.req_header();
80        let method = req_header.method.as_str();
81        let path = req_header.uri.path();
82        let host = req_header
83            .headers
84            .get("host")
85            .and_then(|h| h.to_str().ok())
86            .unwrap_or("");
87
88        ctx.method = method.to_string();
89        ctx.path = path.to_string();
90        ctx.host = Some(host.to_string());
91
92        // Match route to determine service type
93        let route_match = {
94            let route_matcher = self.route_matcher.read();
95            let request_info = RequestInfo::new(method, path, host);
96            match route_matcher.match_request(&request_info) {
97                Some(m) => m,
98                None => return Ok(()), // No matching route, let upstream_peer handle it
99            }
100        };
101
102        ctx.trace_id = self.get_trace_id(session);
103        ctx.route_id = Some(route_match.route_id.to_string());
104        ctx.route_config = Some(route_match.config.clone());
105
106        // Parse incoming W3C trace context if present
107        if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
108            if let Ok(s) = traceparent.to_str() {
109                ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
110            }
111        }
112
113        // Start OpenTelemetry request span if tracing is enabled
114        if let Some(tracer) = crate::otel::get_tracer() {
115            ctx.otel_span = Some(tracer.start_span(method, path, ctx.trace_context.as_ref()));
116        }
117
118        // Check if this is a builtin handler route
119        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
120            trace!(
121                correlation_id = %ctx.trace_id,
122                route_id = %route_match.route_id,
123                builtin_handler = ?route_match.config.builtin_handler,
124                "Handling builtin route in early_request_filter"
125            );
126
127            // Handle the builtin route directly
128            let handled = self
129                .handle_builtin_route(session, ctx, &route_match)
130                .await?;
131
132            if handled {
133                // Return error to signal that request is complete (Pingora will not continue)
134                return Err(Error::explain(
135                    ErrorType::InternalError,
136                    "Builtin handler complete",
137                ));
138            }
139        }
140
141        Ok(())
142    }
143
144    async fn upstream_peer(
145        &self,
146        session: &mut Session,
147        ctx: &mut Self::CTX,
148    ) -> Result<Box<HttpPeer>, Box<Error>> {
149        // Track active request
150        self.reload_coordinator.inc_requests();
151
152        // Cache global config once per request (avoids repeated Arc clones)
153        if ctx.config.is_none() {
154            ctx.config = Some(self.config_manager.current());
155        }
156
157        // Cache client address for logging if not already set
158        if ctx.client_ip.is_empty() {
159            ctx.client_ip = session
160                .client_addr()
161                .map(|a| a.to_string())
162                .unwrap_or_else(|| "unknown".to_string());
163        }
164
165        let req_header = session.req_header();
166
167        // Cache request info for access logging if not already set
168        if ctx.method.is_empty() {
169            ctx.method = req_header.method.to_string();
170            ctx.path = req_header.uri.path().to_string();
171            ctx.query = req_header.uri.query().map(|q| q.to_string());
172            ctx.host = req_header
173                .headers
174                .get("host")
175                .and_then(|v| v.to_str().ok())
176                .map(|s| s.to_string());
177        }
178        ctx.user_agent = req_header
179            .headers
180            .get("user-agent")
181            .and_then(|v| v.to_str().ok())
182            .map(|s| s.to_string());
183        ctx.referer = req_header
184            .headers
185            .get("referer")
186            .and_then(|v| v.to_str().ok())
187            .map(|s| s.to_string());
188
189        trace!(
190            correlation_id = %ctx.trace_id,
191            client_ip = %ctx.client_ip,
192            "Request received, initializing context"
193        );
194
195        // Use cached route info if already set by early_request_filter
196        let route_match = if let Some(ref route_config) = ctx.route_config {
197            let route_id = ctx.route_id.as_deref().unwrap_or("");
198            crate::routing::RouteMatch {
199                route_id: sentinel_common::RouteId::new(route_id),
200                config: route_config.clone(),
201            }
202        } else {
203            // Match route using sync RwLock (scoped to ensure lock is released before async ops)
204            let (match_result, route_duration) = {
205                let route_matcher = self.route_matcher.read();
206                let host = ctx.host.as_deref().unwrap_or("");
207
208                // Build request info (zero-copy for common case)
209                let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
210
211                // Only build headers HashMap if any route needs header matching
212                if route_matcher.needs_headers() {
213                    request_info = request_info
214                        .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
215                }
216
217                // Only parse query params if any route needs query param matching
218                if route_matcher.needs_query_params() {
219                    request_info =
220                        request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
221                }
222
223                trace!(
224                    correlation_id = %ctx.trace_id,
225                    method = %request_info.method,
226                    path = %request_info.path,
227                    host = %request_info.host,
228                    "Built request info for route matching"
229                );
230
231                let route_start = std::time::Instant::now();
232                let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
233                    warn!(
234                        correlation_id = %ctx.trace_id,
235                        method = %request_info.method,
236                        path = %request_info.path,
237                        host = %request_info.host,
238                        "No matching route found for request"
239                    );
240                    Error::explain(ErrorType::InternalError, "No matching route found")
241                })?;
242                let route_duration = route_start.elapsed();
243                // Lock is dropped here when block ends
244                (route_match, route_duration)
245            };
246
247            ctx.route_id = Some(match_result.route_id.to_string());
248            ctx.route_config = Some(match_result.config.clone());
249
250            // Set trace_id if not already set by early_request_filter
251            if ctx.trace_id.is_empty() {
252                ctx.trace_id = self.get_trace_id(session);
253
254                // Parse incoming W3C trace context if present
255                if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER)
256                {
257                    if let Ok(s) = traceparent.to_str() {
258                        ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
259                    }
260                }
261
262                // Start OpenTelemetry request span if tracing is enabled
263                if let Some(tracer) = crate::otel::get_tracer() {
264                    ctx.otel_span = Some(tracer.start_span(
265                        &ctx.method,
266                        &ctx.path,
267                        ctx.trace_context.as_ref(),
268                    ));
269                }
270            }
271
272            trace!(
273                correlation_id = %ctx.trace_id,
274                route_id = %match_result.route_id,
275                route_duration_us = route_duration.as_micros(),
276                service_type = ?match_result.config.service_type,
277                "Route matched"
278            );
279            match_result
280        };
281
282        // Check if this is a builtin handler route (no upstream needed)
283        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
284            trace!(
285                correlation_id = %ctx.trace_id,
286                route_id = %route_match.route_id,
287                builtin_handler = ?route_match.config.builtin_handler,
288                "Route type is builtin, skipping upstream"
289            );
290            // Mark as builtin route for later processing in request_filter
291            ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
292            // Return error to skip upstream connection for builtin routes
293            return Err(Error::explain(
294                ErrorType::InternalError,
295                "Builtin handler handled in request_filter",
296            ));
297        }
298
299        // Check if this is a static file route
300        if route_match.config.service_type == sentinel_config::ServiceType::Static {
301            trace!(
302                correlation_id = %ctx.trace_id,
303                route_id = %route_match.route_id,
304                "Route type is static, checking for static server"
305            );
306            // Static routes don't need an upstream
307            if self
308                .static_servers
309                .get(route_match.route_id.as_str())
310                .await
311                .is_some()
312            {
313                // Mark this as a static route for later processing
314                ctx.upstream = Some(format!("_static_{}", route_match.route_id));
315                info!(
316                    correlation_id = %ctx.trace_id,
317                    route_id = %route_match.route_id,
318                    path = %ctx.path,
319                    "Serving static file"
320                );
321                // Return error to avoid upstream connection for static routes
322                return Err(Error::explain(
323                    ErrorType::InternalError,
324                    "Static file serving handled in request_filter",
325                ));
326            }
327        }
328
329        // === Model-based routing (for inference routes) ===
330        // Check if model routing is configured and select upstream based on model
331        let mut model_routing_applied = false;
332        if let Some(ref inference) = route_match.config.inference {
333            if let Some(ref model_routing) = inference.model_routing {
334                // Try to extract model from headers (fast path - no body parsing needed)
335                let model = model_routing::extract_model_from_headers(&req_header.headers);
336
337                if let Some(ref model_name) = model {
338                    // Find upstream for this model
339                    if let Some(routing_result) =
340                        model_routing::find_upstream_for_model(model_routing, model_name)
341                    {
342                        debug!(
343                            correlation_id = %ctx.trace_id,
344                            route_id = %route_match.route_id,
345                            model = %model_name,
346                            upstream = %routing_result.upstream,
347                            is_default = routing_result.is_default,
348                            provider_override = ?routing_result.provider,
349                            "Model-based routing selected upstream"
350                        );
351
352                        ctx.record_model_routing(
353                            &routing_result.upstream,
354                            Some(model_name.clone()),
355                            routing_result.provider,
356                        );
357                        model_routing_applied = true;
358
359                        // Record metrics
360                        if let Some(metrics) = get_model_routing_metrics() {
361                            metrics.record_model_routed(
362                                route_match.route_id.as_str(),
363                                model_name,
364                                &routing_result.upstream,
365                            );
366                            if routing_result.is_default {
367                                metrics.record_default_upstream(route_match.route_id.as_str());
368                            }
369                            if let Some(provider) = routing_result.provider {
370                                metrics.record_provider_override(
371                                    route_match.route_id.as_str(),
372                                    &routing_result.upstream,
373                                    provider.as_str(),
374                                );
375                            }
376                        }
377                    }
378                } else if let Some(ref default_upstream) = model_routing.default_upstream {
379                    // No model in headers, use default upstream
380                    debug!(
381                        correlation_id = %ctx.trace_id,
382                        route_id = %route_match.route_id,
383                        upstream = %default_upstream,
384                        "Model-based routing using default upstream (no model header)"
385                    );
386                    ctx.record_model_routing(default_upstream, None, None);
387                    model_routing_applied = true;
388
389                    // Record metrics for no model header case
390                    if let Some(metrics) = get_model_routing_metrics() {
391                        metrics.record_no_model_header(route_match.route_id.as_str());
392                    }
393                }
394            }
395        }
396
397        // Regular route with upstream (if model routing didn't set it)
398        if !model_routing_applied {
399            if let Some(ref upstream) = route_match.config.upstream {
400                ctx.upstream = Some(upstream.clone());
401                trace!(
402                    correlation_id = %ctx.trace_id,
403                    route_id = %route_match.route_id,
404                    upstream = %upstream,
405                    "Upstream configured for route"
406                );
407            } else {
408                error!(
409                    correlation_id = %ctx.trace_id,
410                    route_id = %route_match.route_id,
411                    "Route has no upstream configured"
412                );
413                return Err(Error::explain(
414                    ErrorType::InternalError,
415                    format!(
416                        "Route '{}' has no upstream configured",
417                        route_match.route_id
418                    ),
419                ));
420            }
421        }
422
423        // === Fallback routing evaluation (pre-request) ===
424        // Check if fallback should be triggered due to health or budget conditions
425        if let Some(ref fallback_config) = route_match.config.fallback {
426            let upstream_name = ctx.upstream.as_ref().unwrap();
427
428            // Check if primary upstream is healthy
429            let is_healthy = if let Some(pool) = self.upstream_pools.get(upstream_name).await {
430                pool.has_healthy_targets().await
431            } else {
432                false // Pool not found, treat as unhealthy
433            };
434
435            // Check if budget is exhausted (for inference routes)
436            let is_budget_exhausted = ctx.inference_budget_exhausted;
437
438            // Get model name for model mapping (inference routes)
439            let current_model = ctx.inference_model.as_deref();
440
441            // Create fallback evaluator
442            let evaluator = FallbackEvaluator::new(
443                fallback_config,
444                ctx.tried_upstreams(),
445                ctx.fallback_attempt,
446            );
447
448            // Evaluate pre-request fallback conditions
449            if let Some(decision) = evaluator.should_fallback_before_request(
450                upstream_name,
451                is_healthy,
452                is_budget_exhausted,
453                current_model,
454            ) {
455                info!(
456                    correlation_id = %ctx.trace_id,
457                    route_id = %route_match.route_id,
458                    from_upstream = %upstream_name,
459                    to_upstream = %decision.next_upstream,
460                    reason = %decision.reason,
461                    fallback_attempt = ctx.fallback_attempt + 1,
462                    "Triggering fallback routing"
463                );
464
465                // Record fallback metrics
466                if let Some(metrics) = get_fallback_metrics() {
467                    metrics.record_fallback_attempt(
468                        route_match.route_id.as_str(),
469                        upstream_name,
470                        &decision.next_upstream,
471                        &decision.reason,
472                    );
473                }
474
475                // Record fallback in context
476                ctx.record_fallback(decision.reason, &decision.next_upstream);
477
478                // Apply model mapping if present
479                if let Some((original, mapped)) = decision.model_mapping {
480                    // Record model mapping metrics
481                    if let Some(metrics) = get_fallback_metrics() {
482                        metrics.record_model_mapping(
483                            route_match.route_id.as_str(),
484                            &original,
485                            &mapped,
486                        );
487                    }
488
489                    ctx.record_model_mapping(original, mapped);
490                    trace!(
491                        correlation_id = %ctx.trace_id,
492                        original_model = ?ctx.model_mapping_applied().map(|m| &m.0),
493                        mapped_model = ?ctx.model_mapping_applied().map(|m| &m.1),
494                        "Applied model mapping for fallback"
495                    );
496                }
497            }
498        }
499
500        debug!(
501            correlation_id = %ctx.trace_id,
502            route_id = %route_match.route_id,
503            upstream = ?ctx.upstream,
504            method = %req_header.method,
505            path = %req_header.uri.path(),
506            host = ctx.host.as_deref().unwrap_or("-"),
507            client_ip = %ctx.client_ip,
508            "Processing request"
509        );
510
511        // Get upstream pool (skip for static routes)
512        if ctx
513            .upstream
514            .as_ref()
515            .is_some_and(|u| u.starts_with("_static_"))
516        {
517            // Static routes are handled in request_filter, should not reach here
518            return Err(Error::explain(
519                ErrorType::InternalError,
520                "Static route should be handled in request_filter",
521            ));
522        }
523
524        let upstream_name = ctx
525            .upstream
526            .as_ref()
527            .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
528
529        trace!(
530            correlation_id = %ctx.trace_id,
531            upstream = %upstream_name,
532            "Looking up upstream pool"
533        );
534
535        let pool = self
536            .upstream_pools
537            .get(upstream_name)
538            .await
539            .ok_or_else(|| {
540                error!(
541                    correlation_id = %ctx.trace_id,
542                    upstream = %upstream_name,
543                    "Upstream pool not found"
544                );
545                Error::explain(
546                    ErrorType::InternalError,
547                    format!("Upstream pool '{}' not found", upstream_name),
548                )
549            })?;
550
551        // Select peer from pool with retries
552        let max_retries = route_match
553            .config
554            .retry_policy
555            .as_ref()
556            .map(|r| r.max_attempts)
557            .unwrap_or(1);
558
559        trace!(
560            correlation_id = %ctx.trace_id,
561            upstream = %upstream_name,
562            max_retries = max_retries,
563            "Starting upstream peer selection"
564        );
565
566        let mut last_error = None;
567        let selection_start = std::time::Instant::now();
568
569        for attempt in 1..=max_retries {
570            ctx.upstream_attempts = attempt;
571
572            trace!(
573                correlation_id = %ctx.trace_id,
574                upstream = %upstream_name,
575                attempt = attempt,
576                max_retries = max_retries,
577                "Attempting to select upstream peer"
578            );
579
580            match pool.select_peer(None).await {
581                Ok(peer) => {
582                    let selection_duration = selection_start.elapsed();
583                    // Store selected peer address for feedback reporting in logging()
584                    let peer_addr = peer.address().to_string();
585                    ctx.selected_upstream_address = Some(peer_addr.clone());
586                    debug!(
587                        correlation_id = %ctx.trace_id,
588                        upstream = %upstream_name,
589                        peer_address = %peer_addr,
590                        attempt = attempt,
591                        selection_duration_us = selection_duration.as_micros(),
592                        "Selected upstream peer"
593                    );
594                    return Ok(Box::new(peer));
595                }
596                Err(e) => {
597                    warn!(
598                        correlation_id = %ctx.trace_id,
599                        upstream = %upstream_name,
600                        attempt = attempt,
601                        max_retries = max_retries,
602                        error = %e,
603                        "Failed to select upstream peer"
604                    );
605                    last_error = Some(e);
606
607                    if attempt < max_retries {
608                        // Exponential backoff (using pingora-timeout for efficiency)
609                        let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
610                        trace!(
611                            correlation_id = %ctx.trace_id,
612                            backoff_ms = backoff.as_millis(),
613                            "Backing off before retry"
614                        );
615                        sleep(backoff).await;
616                    }
617                }
618            }
619        }
620
621        let selection_duration = selection_start.elapsed();
622        error!(
623            correlation_id = %ctx.trace_id,
624            upstream = %upstream_name,
625            attempts = max_retries,
626            selection_duration_ms = selection_duration.as_millis(),
627            last_error = ?last_error,
628            "All upstream selection attempts failed"
629        );
630
631        // Record exhausted metric if fallback was used but all upstreams failed
632        if ctx.used_fallback() {
633            if let Some(metrics) = get_fallback_metrics() {
634                metrics.record_fallback_exhausted(ctx.route_id.as_deref().unwrap_or("unknown"));
635            }
636        }
637
638        Err(Error::explain(
639            ErrorType::InternalError,
640            format!("All upstream attempts failed: {:?}", last_error),
641        ))
642    }
643
644    async fn request_filter(
645        &self,
646        session: &mut Session,
647        ctx: &mut Self::CTX,
648    ) -> Result<bool, Box<Error>> {
649        trace!(
650            correlation_id = %ctx.trace_id,
651            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
652            "Starting request filter phase"
653        );
654
655        // Check rate limiting early (before other processing)
656        // Fast path: skip if no rate limiting is configured for this route
657        if let Some(route_id) = ctx.route_id.as_deref() {
658            if self.rate_limit_manager.has_route_limiter(route_id) {
659                let rate_result = self.rate_limit_manager.check(
660                    route_id,
661                    &ctx.client_ip,
662                    &ctx.path,
663                    Option::<&NoHeaderAccessor>::None,
664                );
665
666                // Store rate limit info for response headers (even if allowed)
667                if rate_result.limit > 0 {
668                    ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
669                        limit: rate_result.limit,
670                        remaining: rate_result.remaining,
671                        reset_at: rate_result.reset_at,
672                    });
673                }
674
675                if !rate_result.allowed {
676                    use sentinel_config::RateLimitAction;
677
678                    match rate_result.action {
679                        RateLimitAction::Reject => {
680                            warn!(
681                                correlation_id = %ctx.trace_id,
682                                route_id = route_id,
683                                client_ip = %ctx.client_ip,
684                                limiter = %rate_result.limiter,
685                                limit = rate_result.limit,
686                                remaining = rate_result.remaining,
687                                "Request rate limited"
688                            );
689                            self.metrics.record_blocked_request("rate_limited");
690
691                            // Audit log the rate limit
692                            let audit_entry = AuditLogEntry::rate_limited(
693                                &ctx.trace_id,
694                                &ctx.method,
695                                &ctx.path,
696                                &ctx.client_ip,
697                                &rate_result.limiter,
698                            )
699                            .with_route_id(route_id)
700                            .with_status_code(rate_result.status_code);
701                            self.log_manager.log_audit(&audit_entry);
702
703                            // Send rate limit response with headers
704                            let body = rate_result
705                                .message
706                                .unwrap_or_else(|| "Rate limit exceeded".to_string());
707
708                            // Build response with rate limit headers
709                            let retry_after = rate_result.reset_at.saturating_sub(
710                                std::time::SystemTime::now()
711                                    .duration_since(std::time::UNIX_EPOCH)
712                                    .unwrap_or_default()
713                                    .as_secs(),
714                            );
715                            crate::http_helpers::write_rate_limit_error(
716                                session,
717                                rate_result.status_code,
718                                &body,
719                                rate_result.limit,
720                                rate_result.remaining,
721                                rate_result.reset_at,
722                                retry_after,
723                            )
724                            .await?;
725                            return Ok(true); // Request complete, don't continue
726                        }
727                        RateLimitAction::LogOnly => {
728                            debug!(
729                                correlation_id = %ctx.trace_id,
730                                route_id = route_id,
731                                "Rate limit exceeded (log only mode)"
732                            );
733                            // Continue processing
734                        }
735                        RateLimitAction::Delay => {
736                            // Apply delay if suggested by rate limiter
737                            if let Some(delay_ms) = rate_result.suggested_delay_ms {
738                                // Cap delay at the configured maximum
739                                let actual_delay = delay_ms.min(rate_result.max_delay_ms);
740
741                                if actual_delay > 0 {
742                                    debug!(
743                                        correlation_id = %ctx.trace_id,
744                                        route_id = route_id,
745                                        suggested_delay_ms = delay_ms,
746                                        max_delay_ms = rate_result.max_delay_ms,
747                                        actual_delay_ms = actual_delay,
748                                        "Applying rate limit delay"
749                                    );
750
751                                    tokio::time::sleep(std::time::Duration::from_millis(
752                                        actual_delay,
753                                    ))
754                                    .await;
755                                }
756                            }
757                            // Continue processing after delay
758                        }
759                    }
760                }
761            }
762        }
763
764        // Inference rate limiting (token-based, for LLM/AI routes)
765        // This runs after regular rate limiting and checks service type
766        if let Some(route_id) = ctx.route_id.as_deref() {
767            if let Some(ref route_config) = ctx.route_config {
768                if route_config.service_type == sentinel_config::ServiceType::Inference
769                    && self.inference_rate_limit_manager.has_route(route_id)
770                {
771                    // For inference rate limiting, we need access to the request body
772                    // to estimate tokens. We'll use buffered body if available.
773                    let headers = &session.req_header().headers;
774
775                    // Try to get buffered body, or use empty (will estimate from headers only)
776                    let body = ctx.body_buffer.as_slice();
777
778                    // Use client IP as the rate limit key (could be enhanced to use API key header)
779                    let rate_limit_key = &ctx.client_ip;
780
781                    if let Some(check_result) = self.inference_rate_limit_manager.check(
782                        route_id,
783                        rate_limit_key,
784                        headers,
785                        body,
786                    ) {
787                        // Store inference rate limiting context for recording actual tokens later
788                        ctx.inference_rate_limit_enabled = true;
789                        ctx.inference_estimated_tokens = check_result.estimated_tokens;
790                        ctx.inference_rate_limit_key = Some(rate_limit_key.to_string());
791                        ctx.inference_model = check_result.model.clone();
792
793                        if !check_result.is_allowed() {
794                            let retry_after_ms = check_result.retry_after_ms();
795                            let retry_after_secs = retry_after_ms.div_ceil(1000);
796
797                            warn!(
798                                correlation_id = %ctx.trace_id,
799                                route_id = route_id,
800                                client_ip = %ctx.client_ip,
801                                estimated_tokens = check_result.estimated_tokens,
802                                model = ?check_result.model,
803                                retry_after_ms = retry_after_ms,
804                                "Inference rate limit exceeded (tokens)"
805                            );
806                            self.metrics.record_blocked_request("inference_rate_limited");
807
808                            // Audit log the token rate limit
809                            let audit_entry = AuditLogEntry::new(
810                                &ctx.trace_id,
811                                AuditEventType::RateLimitExceeded,
812                                &ctx.method,
813                                &ctx.path,
814                                &ctx.client_ip,
815                            )
816                            .with_route_id(route_id)
817                            .with_status_code(429)
818                            .with_reason(format!(
819                                "Token rate limit exceeded: estimated {} tokens, model={:?}",
820                                check_result.estimated_tokens, check_result.model
821                            ));
822                            self.log_manager.log_audit(&audit_entry);
823
824                            // Send 429 response with appropriate headers
825                            let body = "Token rate limit exceeded";
826                            let reset_at = std::time::SystemTime::now()
827                                .duration_since(std::time::UNIX_EPOCH)
828                                .unwrap_or_default()
829                                .as_secs()
830                                + retry_after_secs;
831
832                            // Use simplified error write for inference rate limit
833                            crate::http_helpers::write_rate_limit_error(
834                                session,
835                                429,
836                                body,
837                                0, // No request limit
838                                0, // No remaining
839                                reset_at,
840                                retry_after_secs,
841                            )
842                            .await?;
843                            return Ok(true); // Request complete, don't continue
844                        }
845
846                        trace!(
847                            correlation_id = %ctx.trace_id,
848                            route_id = route_id,
849                            estimated_tokens = check_result.estimated_tokens,
850                            model = ?check_result.model,
851                            "Inference rate limit check passed"
852                        );
853
854                        // Check budget tracking (cumulative per-period limits)
855                        if self.inference_rate_limit_manager.has_budget(route_id) {
856                            ctx.inference_budget_enabled = true;
857
858                            if let Some(budget_result) = self.inference_rate_limit_manager.check_budget(
859                                route_id,
860                                rate_limit_key,
861                                check_result.estimated_tokens,
862                            ) {
863                                if !budget_result.is_allowed() {
864                                    let retry_after_secs = budget_result.retry_after_secs();
865
866                                    warn!(
867                                        correlation_id = %ctx.trace_id,
868                                        route_id = route_id,
869                                        client_ip = %ctx.client_ip,
870                                        estimated_tokens = check_result.estimated_tokens,
871                                        retry_after_secs = retry_after_secs,
872                                        "Token budget exhausted"
873                                    );
874
875                                    ctx.inference_budget_exhausted = true;
876                                    self.metrics.record_blocked_request("budget_exhausted");
877
878                                    // Audit log the budget exhaustion
879                                    let audit_entry = AuditLogEntry::new(
880                                        &ctx.trace_id,
881                                        AuditEventType::RateLimitExceeded,
882                                        &ctx.method,
883                                        &ctx.path,
884                                        &ctx.client_ip,
885                                    )
886                                    .with_route_id(route_id)
887                                    .with_status_code(429)
888                                    .with_reason("Token budget exhausted".to_string());
889                                    self.log_manager.log_audit(&audit_entry);
890
891                                    // Send 429 response with budget headers
892                                    let body = "Token budget exhausted";
893                                    let reset_at = std::time::SystemTime::now()
894                                        .duration_since(std::time::UNIX_EPOCH)
895                                        .unwrap_or_default()
896                                        .as_secs()
897                                        + retry_after_secs;
898
899                                    crate::http_helpers::write_rate_limit_error(
900                                        session,
901                                        429,
902                                        body,
903                                        0,
904                                        0,
905                                        reset_at,
906                                        retry_after_secs,
907                                    )
908                                    .await?;
909                                    return Ok(true);
910                                }
911
912                                // Capture budget status for response headers
913                                let remaining = match &budget_result {
914                                    sentinel_common::budget::BudgetCheckResult::Allowed { remaining } => *remaining as i64,
915                                    sentinel_common::budget::BudgetCheckResult::Soft { remaining, .. } => *remaining,
916                                    _ => 0,
917                                };
918                                ctx.inference_budget_remaining = Some(remaining);
919
920                                // Get period reset time from budget status
921                                if let Some(status) = self.inference_rate_limit_manager.budget_status(
922                                    route_id,
923                                    rate_limit_key,
924                                ) {
925                                    ctx.inference_budget_period_reset = Some(status.period_end);
926                                }
927
928                                trace!(
929                                    correlation_id = %ctx.trace_id,
930                                    route_id = route_id,
931                                    budget_remaining = remaining,
932                                    "Token budget check passed"
933                                );
934                            }
935                        }
936
937                        // Check if cost attribution is enabled
938                        if self.inference_rate_limit_manager.has_cost_attribution(route_id) {
939                            ctx.inference_cost_enabled = true;
940                        }
941                    }
942                }
943            }
944        }
945
946        // Prompt injection guardrail (for inference routes)
947        if let Some(ref route_config) = ctx.route_config {
948            if let Some(ref inference) = route_config.inference {
949                if let Some(ref guardrails) = inference.guardrails {
950                    if let Some(ref pi_config) = guardrails.prompt_injection {
951                        if pi_config.enabled && !ctx.body_buffer.is_empty() {
952                            ctx.guardrails_enabled = true;
953
954                            // Extract content from request body
955                            if let Some(content) = extract_inference_content(&ctx.body_buffer) {
956                                let result = self
957                                    .guardrail_processor
958                                    .check_prompt_injection(
959                                        pi_config,
960                                        &content,
961                                        ctx.inference_model.as_deref(),
962                                        ctx.route_id.as_deref(),
963                                        &ctx.trace_id,
964                                    )
965                                    .await;
966
967                                match result {
968                                    PromptInjectionResult::Blocked {
969                                        status,
970                                        message,
971                                        detections,
972                                    } => {
973                                        warn!(
974                                            correlation_id = %ctx.trace_id,
975                                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
976                                            detection_count = detections.len(),
977                                            "Prompt injection detected, blocking request"
978                                        );
979
980                                        self.metrics.record_blocked_request("prompt_injection");
981
982                                        // Store detection categories for logging
983                                        ctx.guardrail_detection_categories = detections
984                                            .iter()
985                                            .map(|d| d.category.clone())
986                                            .collect();
987
988                                        // Audit log the block
989                                        let audit_entry = AuditLogEntry::new(
990                                            &ctx.trace_id,
991                                            AuditEventType::Blocked,
992                                            &ctx.method,
993                                            &ctx.path,
994                                            &ctx.client_ip,
995                                        )
996                                        .with_route_id(
997                                            ctx.route_id.as_deref().unwrap_or("unknown"),
998                                        )
999                                        .with_status_code(status)
1000                                        .with_reason("Prompt injection detected".to_string());
1001                                        self.log_manager.log_audit(&audit_entry);
1002
1003                                        // Send error response
1004                                        crate::http_helpers::write_json_error(
1005                                            session,
1006                                            status,
1007                                            "prompt_injection_blocked",
1008                                            Some(&message),
1009                                        )
1010                                        .await?;
1011                                        return Ok(true);
1012                                    }
1013                                    PromptInjectionResult::Detected { detections } => {
1014                                        // Log but allow
1015                                        warn!(
1016                                            correlation_id = %ctx.trace_id,
1017                                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1018                                            detection_count = detections.len(),
1019                                            "Prompt injection detected (logged only)"
1020                                        );
1021                                        ctx.guardrail_detection_categories = detections
1022                                            .iter()
1023                                            .map(|d| d.category.clone())
1024                                            .collect();
1025                                    }
1026                                    PromptInjectionResult::Warning { detections } => {
1027                                        // Set flag for response header
1028                                        ctx.guardrail_warning = true;
1029                                        ctx.guardrail_detection_categories = detections
1030                                            .iter()
1031                                            .map(|d| d.category.clone())
1032                                            .collect();
1033                                        debug!(
1034                                            correlation_id = %ctx.trace_id,
1035                                            "Prompt injection warning set"
1036                                        );
1037                                    }
1038                                    PromptInjectionResult::Clean => {
1039                                        trace!(
1040                                            correlation_id = %ctx.trace_id,
1041                                            "No prompt injection detected"
1042                                        );
1043                                    }
1044                                    PromptInjectionResult::Error { message } => {
1045                                        // Already logged in processor, just trace here
1046                                        trace!(
1047                                            correlation_id = %ctx.trace_id,
1048                                            error = %message,
1049                                            "Prompt injection check error (failure mode applied)"
1050                                        );
1051                                    }
1052                                }
1053                            }
1054                        }
1055                    }
1056                }
1057            }
1058        }
1059
1060        // Geo filtering
1061        if let Some(route_id) = ctx.route_id.as_deref() {
1062            if let Some(ref route_config) = ctx.route_config {
1063                for filter_id in &route_config.filters {
1064                    if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
1065                        // Store country code for response header
1066                        ctx.geo_country_code = result.country_code.clone();
1067                        ctx.geo_lookup_performed = true;
1068
1069                        if !result.allowed {
1070                            warn!(
1071                                correlation_id = %ctx.trace_id,
1072                                route_id = route_id,
1073                                client_ip = %ctx.client_ip,
1074                                country = ?result.country_code,
1075                                filter_id = %filter_id,
1076                                "Request blocked by geo filter"
1077                            );
1078                            self.metrics.record_blocked_request("geo_blocked");
1079
1080                            // Audit log the geo block
1081                            let audit_entry = AuditLogEntry::new(
1082                                &ctx.trace_id,
1083                                AuditEventType::Blocked,
1084                                &ctx.method,
1085                                &ctx.path,
1086                                &ctx.client_ip,
1087                            )
1088                            .with_route_id(route_id)
1089                            .with_status_code(result.status_code)
1090                            .with_reason(format!(
1091                                "Geo blocked: country={}, filter={}",
1092                                result.country_code.as_deref().unwrap_or("unknown"),
1093                                filter_id
1094                            ));
1095                            self.log_manager.log_audit(&audit_entry);
1096
1097                            // Send geo block response
1098                            let body = result
1099                                .block_message
1100                                .unwrap_or_else(|| "Access denied".to_string());
1101
1102                            crate::http_helpers::write_error(
1103                                session,
1104                                result.status_code,
1105                                &body,
1106                                "text/plain",
1107                            )
1108                            .await?;
1109                            return Ok(true); // Request complete, don't continue
1110                        }
1111
1112                        // Only check first geo filter that matches
1113                        break;
1114                    }
1115                }
1116            }
1117        }
1118
1119        // Check for WebSocket upgrade requests
1120        let is_websocket_upgrade = session
1121            .req_header()
1122            .headers
1123            .get(http::header::UPGRADE)
1124            .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
1125            .unwrap_or(false);
1126
1127        if is_websocket_upgrade {
1128            ctx.is_websocket_upgrade = true;
1129
1130            // Check if route allows WebSocket upgrades
1131            if let Some(ref route_config) = ctx.route_config {
1132                if !route_config.websocket {
1133                    warn!(
1134                        correlation_id = %ctx.trace_id,
1135                        route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1136                        client_ip = %ctx.client_ip,
1137                        "WebSocket upgrade rejected: not enabled for route"
1138                    );
1139
1140                    self.metrics.record_blocked_request("websocket_not_enabled");
1141
1142                    // Audit log the rejection
1143                    let audit_entry = AuditLogEntry::new(
1144                        &ctx.trace_id,
1145                        AuditEventType::Blocked,
1146                        &ctx.method,
1147                        &ctx.path,
1148                        &ctx.client_ip,
1149                    )
1150                    .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
1151                    .with_action("websocket_rejected")
1152                    .with_reason("WebSocket not enabled for route");
1153                    self.log_manager.log_audit(&audit_entry);
1154
1155                    // Send 403 Forbidden response
1156                    crate::http_helpers::write_error(
1157                        session,
1158                        403,
1159                        "WebSocket not enabled for this route",
1160                        "text/plain",
1161                    )
1162                    .await?;
1163                    return Ok(true); // Request complete, don't continue
1164                }
1165
1166                debug!(
1167                    correlation_id = %ctx.trace_id,
1168                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1169                    "WebSocket upgrade request allowed"
1170                );
1171
1172                // Check for WebSocket frame inspection
1173                if route_config.websocket_inspection {
1174                    // Check for compression negotiation - skip inspection if permessage-deflate
1175                    let has_compression = session
1176                        .req_header()
1177                        .headers
1178                        .get("Sec-WebSocket-Extensions")
1179                        .and_then(|v| v.to_str().ok())
1180                        .map(|s| s.contains("permessage-deflate"))
1181                        .unwrap_or(false);
1182
1183                    if has_compression {
1184                        debug!(
1185                            correlation_id = %ctx.trace_id,
1186                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1187                            "WebSocket inspection skipped: permessage-deflate negotiated"
1188                        );
1189                        ctx.websocket_skip_inspection = true;
1190                    } else {
1191                        ctx.websocket_inspection_enabled = true;
1192
1193                        // Get agents that handle WebSocketFrame events
1194                        ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
1195                            sentinel_agent_protocol::EventType::WebSocketFrame,
1196                        );
1197
1198                        debug!(
1199                            correlation_id = %ctx.trace_id,
1200                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1201                            agent_count = ctx.websocket_inspection_agents.len(),
1202                            "WebSocket frame inspection enabled"
1203                        );
1204                    }
1205                }
1206            }
1207        }
1208
1209        // Use cached route config from upstream_peer (avoids duplicate route matching)
1210        // Handle static file and builtin routes
1211        if let Some(route_config) = ctx.route_config.clone() {
1212            if route_config.service_type == sentinel_config::ServiceType::Static {
1213                trace!(
1214                    correlation_id = %ctx.trace_id,
1215                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1216                    "Handling static file route"
1217                );
1218                // Create a minimal RouteMatch for the handler
1219                let route_match = crate::routing::RouteMatch {
1220                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1221                    config: route_config.clone(),
1222                };
1223                return self.handle_static_route(session, ctx, &route_match).await;
1224            } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
1225                trace!(
1226                    correlation_id = %ctx.trace_id,
1227                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1228                    builtin_handler = ?route_config.builtin_handler,
1229                    "Handling builtin route"
1230                );
1231                // Create a minimal RouteMatch for the handler
1232                let route_match = crate::routing::RouteMatch {
1233                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1234                    config: route_config.clone(),
1235                };
1236                return self.handle_builtin_route(session, ctx, &route_match).await;
1237            }
1238        }
1239
1240        // API validation for API routes
1241        if let Some(route_id) = ctx.route_id.clone() {
1242            if let Some(validator) = self.validators.get(&route_id).await {
1243                trace!(
1244                    correlation_id = %ctx.trace_id,
1245                    route_id = %route_id,
1246                    "Running API schema validation"
1247                );
1248                if let Some(result) = self
1249                    .validate_api_request(session, ctx, &route_id, &validator)
1250                    .await?
1251                {
1252                    debug!(
1253                        correlation_id = %ctx.trace_id,
1254                        route_id = %route_id,
1255                        validation_passed = result,
1256                        "API validation complete"
1257                    );
1258                    return Ok(result);
1259                }
1260            }
1261        }
1262
1263        // Get client address before mutable borrow
1264        let client_addr = session
1265            .client_addr()
1266            .map(|a| format!("{}", a))
1267            .unwrap_or_else(|| "unknown".to_string());
1268        let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
1269
1270        let req_header = session.req_header_mut();
1271
1272        // Add correlation ID header
1273        req_header
1274            .insert_header("X-Correlation-Id", &ctx.trace_id)
1275            .ok();
1276        req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
1277
1278        // Use cached config (set in upstream_peer, or fetch now if needed)
1279        let config = ctx
1280            .config
1281            .get_or_insert_with(|| self.config_manager.current());
1282
1283        // Enforce header limits (fast path: skip if limits are very high)
1284        const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; // 1MB = effectively unlimited
1285
1286        // Header count check - O(1)
1287        let header_count = req_header.headers.len();
1288        if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
1289            && header_count > config.limits.max_header_count
1290        {
1291            warn!(
1292                correlation_id = %ctx.trace_id,
1293                header_count = header_count,
1294                limit = config.limits.max_header_count,
1295                "Request blocked: exceeds header count limit"
1296            );
1297
1298            self.metrics.record_blocked_request("header_count_exceeded");
1299            return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
1300        }
1301
1302        // Header size check - O(n), skip if limit is very high
1303        if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
1304            let total_header_size: usize = req_header
1305                .headers
1306                .iter()
1307                .map(|(k, v)| k.as_str().len() + v.len())
1308                .sum();
1309
1310            if total_header_size > config.limits.max_header_size_bytes {
1311                warn!(
1312                    correlation_id = %ctx.trace_id,
1313                    header_size = total_header_size,
1314                    limit = config.limits.max_header_size_bytes,
1315                    "Request blocked: exceeds header size limit"
1316                );
1317
1318                self.metrics.record_blocked_request("header_size_exceeded");
1319                return Err(Error::explain(
1320                    ErrorType::InternalError,
1321                    "Headers too large",
1322                ));
1323            }
1324        }
1325
1326        // Process through external agents
1327        trace!(
1328            correlation_id = %ctx.trace_id,
1329            "Processing request through agents"
1330        );
1331        if let Err(e) = self
1332            .process_agents(session, ctx, &client_addr, client_port)
1333            .await
1334        {
1335            // Check if this is an HTTPStatus error (e.g., agent block or fail-closed)
1336            // In that case, we need to send a proper HTTP response instead of just closing the connection
1337            if let ErrorType::HTTPStatus(status) = e.etype() {
1338                // Extract the message from the error (the context part after "HTTPStatus context:")
1339                let error_msg = e.to_string();
1340                let body = error_msg
1341                    .split("context:")
1342                    .nth(1)
1343                    .map(|s| s.trim())
1344                    .unwrap_or("Request blocked");
1345                debug!(
1346                    correlation_id = %ctx.trace_id,
1347                    status = status,
1348                    body = %body,
1349                    "Sending HTTP error response for agent block"
1350                );
1351                crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
1352                return Ok(true); // Request complete, don't continue to upstream
1353            }
1354            // For other errors, propagate them
1355            return Err(e);
1356        }
1357
1358        trace!(
1359            correlation_id = %ctx.trace_id,
1360            "Request filter phase complete, forwarding to upstream"
1361        );
1362
1363        Ok(false) // Continue processing
1364    }
1365
1366    /// Process incoming request body chunks.
1367    /// Used for body size enforcement and WAF/agent inspection.
1368    ///
1369    /// Supports two modes:
1370    /// - **Buffer mode** (default): Buffer chunks until end of stream or limit, then send to agents
1371    /// - **Stream mode**: Send each chunk immediately to agents as it arrives
1372    async fn request_body_filter(
1373        &self,
1374        _session: &mut Session,
1375        body: &mut Option<Bytes>,
1376        end_of_stream: bool,
1377        ctx: &mut Self::CTX,
1378    ) -> Result<(), Box<Error>> {
1379        use sentinel_config::BodyStreamingMode;
1380
1381        // Handle WebSocket frame inspection (client -> server)
1382        if ctx.is_websocket_upgrade {
1383            if let Some(ref handler) = ctx.websocket_handler {
1384                let result = handler.process_client_data(body.take()).await;
1385                match result {
1386                    crate::websocket::ProcessResult::Forward(data) => {
1387                        *body = data;
1388                    }
1389                    crate::websocket::ProcessResult::Close(reason) => {
1390                        warn!(
1391                            correlation_id = %ctx.trace_id,
1392                            code = reason.code,
1393                            reason = %reason.reason,
1394                            "WebSocket connection closed by agent (client->server)"
1395                        );
1396                        // Return an error to close the connection
1397                        return Err(Error::explain(
1398                            ErrorType::InternalError,
1399                            format!("WebSocket closed: {} {}", reason.code, reason.reason),
1400                        ));
1401                    }
1402                }
1403            }
1404            // Skip normal body processing for WebSocket
1405            return Ok(());
1406        }
1407
1408        // Track request body size
1409        let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
1410        if chunk_len > 0 {
1411            ctx.request_body_bytes += chunk_len as u64;
1412
1413            trace!(
1414                correlation_id = %ctx.trace_id,
1415                chunk_size = chunk_len,
1416                total_body_bytes = ctx.request_body_bytes,
1417                end_of_stream = end_of_stream,
1418                streaming_mode = ?ctx.request_body_streaming_mode,
1419                "Processing request body chunk"
1420            );
1421
1422            // Check body size limit (use cached config)
1423            let config = ctx
1424                .config
1425                .get_or_insert_with(|| self.config_manager.current());
1426            if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
1427                warn!(
1428                    correlation_id = %ctx.trace_id,
1429                    body_bytes = ctx.request_body_bytes,
1430                    limit = config.limits.max_body_size_bytes,
1431                    "Request body size limit exceeded"
1432                );
1433                self.metrics.record_blocked_request("body_size_exceeded");
1434                return Err(Error::explain(
1435                    ErrorType::InternalError,
1436                    "Request body too large",
1437                ));
1438            }
1439        }
1440
1441        // Body inspection for agents (WAF, etc.)
1442        if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
1443            let config = ctx
1444                .config
1445                .get_or_insert_with(|| self.config_manager.current());
1446            let max_inspection_bytes = config
1447                .waf
1448                .as_ref()
1449                .map(|w| w.body_inspection.max_inspection_bytes as u64)
1450                .unwrap_or(1024 * 1024);
1451
1452            match ctx.request_body_streaming_mode {
1453                BodyStreamingMode::Stream => {
1454                    // Stream mode: send each chunk immediately
1455                    if body.is_some() {
1456                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1457                            .await?;
1458                    } else if end_of_stream && ctx.agent_needs_more {
1459                        // Send final empty chunk to signal end
1460                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1461                            .await?;
1462                    }
1463                }
1464                BodyStreamingMode::Hybrid { buffer_threshold } => {
1465                    // Hybrid mode: buffer up to threshold, then stream
1466                    if ctx.body_bytes_inspected < buffer_threshold as u64 {
1467                        // Still in buffering phase
1468                        if let Some(ref chunk) = body {
1469                            let bytes_to_buffer = std::cmp::min(
1470                                chunk.len(),
1471                                (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
1472                            );
1473                            ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
1474                            ctx.body_bytes_inspected += bytes_to_buffer as u64;
1475
1476                            // If we've reached threshold or end of stream, switch to streaming
1477                            if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
1478                            {
1479                                // Send buffered content first
1480                                self.send_buffered_body_to_agents(
1481                                    end_of_stream && chunk.len() == bytes_to_buffer,
1482                                    ctx,
1483                                )
1484                                .await?;
1485                                ctx.body_buffer.clear();
1486
1487                                // If there's remaining data in this chunk, stream it
1488                                if bytes_to_buffer < chunk.len() {
1489                                    let remaining = chunk.slice(bytes_to_buffer..);
1490                                    let mut remaining_body = Some(remaining);
1491                                    self.process_body_chunk_streaming(
1492                                        &mut remaining_body,
1493                                        end_of_stream,
1494                                        ctx,
1495                                    )
1496                                    .await?;
1497                                }
1498                            }
1499                        }
1500                    } else {
1501                        // Past threshold, stream directly
1502                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1503                            .await?;
1504                    }
1505                }
1506                BodyStreamingMode::Buffer => {
1507                    // Buffer mode: collect chunks until ready to send
1508                    if let Some(ref chunk) = body {
1509                        if ctx.body_bytes_inspected < max_inspection_bytes {
1510                            let bytes_to_inspect = std::cmp::min(
1511                                chunk.len() as u64,
1512                                max_inspection_bytes - ctx.body_bytes_inspected,
1513                            ) as usize;
1514
1515                            ctx.body_buffer
1516                                .extend_from_slice(&chunk[..bytes_to_inspect]);
1517                            ctx.body_bytes_inspected += bytes_to_inspect as u64;
1518
1519                            trace!(
1520                                correlation_id = %ctx.trace_id,
1521                                bytes_inspected = ctx.body_bytes_inspected,
1522                                max_inspection_bytes = max_inspection_bytes,
1523                                buffer_size = ctx.body_buffer.len(),
1524                                "Buffering body for agent inspection"
1525                            );
1526                        }
1527                    }
1528
1529                    // Send when complete or limit reached
1530                    let should_send =
1531                        end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1532                    if should_send && !ctx.body_buffer.is_empty() {
1533                        self.send_buffered_body_to_agents(end_of_stream, ctx)
1534                            .await?;
1535                        ctx.body_buffer.clear();
1536                    }
1537                }
1538            }
1539        }
1540
1541        if end_of_stream {
1542            trace!(
1543                correlation_id = %ctx.trace_id,
1544                total_body_bytes = ctx.request_body_bytes,
1545                bytes_inspected = ctx.body_bytes_inspected,
1546                "Request body complete"
1547            );
1548        }
1549
1550        Ok(())
1551    }
1552
1553    async fn response_filter(
1554        &self,
1555        _session: &mut Session,
1556        upstream_response: &mut ResponseHeader,
1557        ctx: &mut Self::CTX,
1558    ) -> Result<(), Box<Error>> {
1559        let status = upstream_response.status.as_u16();
1560        let duration = ctx.elapsed();
1561
1562        trace!(
1563            correlation_id = %ctx.trace_id,
1564            status = status,
1565            "Starting response filter phase"
1566        );
1567
1568        // Handle WebSocket 101 Switching Protocols
1569        if status == 101 && ctx.is_websocket_upgrade {
1570            if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1571                // Create WebSocket inspector and handler with metrics
1572                let inspector = crate::websocket::WebSocketInspector::with_metrics(
1573                    self.agent_manager.clone(),
1574                    ctx.route_id
1575                        .clone()
1576                        .unwrap_or_else(|| "unknown".to_string()),
1577                    ctx.trace_id.clone(),
1578                    ctx.client_ip.clone(),
1579                    100, // 100ms timeout per frame inspection
1580                    Some(self.metrics.clone()),
1581                );
1582
1583                let handler = crate::websocket::WebSocketHandler::new(
1584                    std::sync::Arc::new(inspector),
1585                    1024 * 1024, // 1MB max frame size
1586                );
1587
1588                ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1589
1590                info!(
1591                    correlation_id = %ctx.trace_id,
1592                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1593                    agent_count = ctx.websocket_inspection_agents.len(),
1594                    "WebSocket upgrade successful, frame inspection enabled"
1595                );
1596            } else if ctx.websocket_skip_inspection {
1597                debug!(
1598                    correlation_id = %ctx.trace_id,
1599                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1600                    "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1601                );
1602            } else {
1603                debug!(
1604                    correlation_id = %ctx.trace_id,
1605                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1606                    "WebSocket upgrade successful"
1607                );
1608            }
1609        }
1610
1611        // Apply security headers
1612        trace!(
1613            correlation_id = %ctx.trace_id,
1614            "Applying security headers"
1615        );
1616        self.apply_security_headers(upstream_response).ok();
1617
1618        // Add correlation ID to response
1619        upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1620
1621        // Add rate limit headers if rate limiting was applied
1622        if let Some(ref rate_info) = ctx.rate_limit_info {
1623            upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1624            upstream_response
1625                .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1626            upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1627        }
1628
1629        // Add token budget headers if budget tracking was enabled
1630        if ctx.inference_budget_enabled {
1631            if let Some(remaining) = ctx.inference_budget_remaining {
1632                upstream_response.insert_header("X-Budget-Remaining", remaining.to_string())?;
1633            }
1634            if let Some(period_reset) = ctx.inference_budget_period_reset {
1635                // Format as ISO 8601 timestamp
1636                let reset_datetime = chrono::DateTime::from_timestamp(period_reset as i64, 0)
1637                    .map(|dt| dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1638                    .unwrap_or_else(|| period_reset.to_string());
1639                upstream_response.insert_header("X-Budget-Period-Reset", reset_datetime)?;
1640            }
1641        }
1642
1643        // Add GeoIP country header if geo lookup was performed
1644        if let Some(ref country_code) = ctx.geo_country_code {
1645            upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1646        }
1647
1648        // Add guardrail warning header if prompt injection was detected (warn mode)
1649        if ctx.guardrail_warning {
1650            upstream_response.insert_header("X-Guardrail-Warning", "prompt-injection-detected")?;
1651        }
1652
1653        // Add fallback routing headers if fallback was used
1654        if ctx.used_fallback() {
1655            upstream_response.insert_header("X-Fallback-Used", "true")?;
1656
1657            if let Some(ref upstream) = ctx.upstream {
1658                upstream_response.insert_header("X-Fallback-Upstream", upstream)?;
1659            }
1660
1661            if let Some(ref reason) = ctx.fallback_reason {
1662                upstream_response.insert_header("X-Fallback-Reason", reason.to_string())?;
1663            }
1664
1665            if let Some(ref original) = ctx.original_upstream {
1666                upstream_response.insert_header("X-Original-Upstream", original)?;
1667            }
1668
1669            if let Some(ref mapping) = ctx.model_mapping_applied {
1670                upstream_response.insert_header(
1671                    "X-Model-Mapping",
1672                    format!("{} -> {}", mapping.0, mapping.1),
1673                )?;
1674            }
1675
1676            trace!(
1677                correlation_id = %ctx.trace_id,
1678                fallback_attempt = ctx.fallback_attempt,
1679                fallback_upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1680                original_upstream = ctx.original_upstream.as_deref().unwrap_or("unknown"),
1681                "Added fallback response headers"
1682            );
1683
1684            // Record fallback success metrics for successful responses (2xx/3xx)
1685            if status < 400 {
1686                if let Some(metrics) = get_fallback_metrics() {
1687                    metrics.record_fallback_success(
1688                        ctx.route_id.as_deref().unwrap_or("unknown"),
1689                        ctx.upstream.as_deref().unwrap_or("unknown"),
1690                    );
1691                }
1692            }
1693        }
1694
1695        // Initialize streaming token counter for SSE responses on inference routes
1696        if ctx.inference_rate_limit_enabled {
1697            // Check if this is an SSE response
1698            let content_type = upstream_response
1699                .headers
1700                .get("content-type")
1701                .and_then(|ct| ct.to_str().ok());
1702
1703            if is_sse_response(content_type) {
1704                // Get provider from route config
1705                let provider = ctx
1706                    .route_config
1707                    .as_ref()
1708                    .and_then(|r| r.inference.as_ref())
1709                    .map(|i| i.provider.clone())
1710                    .unwrap_or_default();
1711
1712                ctx.inference_streaming_response = true;
1713                ctx.inference_streaming_counter = Some(StreamingTokenCounter::new(
1714                    provider,
1715                    ctx.inference_model.clone(),
1716                ));
1717
1718                trace!(
1719                    correlation_id = %ctx.trace_id,
1720                    content_type = ?content_type,
1721                    model = ?ctx.inference_model,
1722                    "Initialized streaming token counter for SSE response"
1723                );
1724            }
1725        }
1726
1727        // Generate custom error pages for error responses
1728        if status >= 400 {
1729            trace!(
1730                correlation_id = %ctx.trace_id,
1731                status = status,
1732                "Handling error response"
1733            );
1734            self.handle_error_response(upstream_response, ctx).await?;
1735        }
1736
1737        // Record metrics
1738        self.metrics.record_request(
1739            ctx.route_id.as_deref().unwrap_or("unknown"),
1740            &ctx.method,
1741            status,
1742            duration,
1743        );
1744
1745        // Record OpenTelemetry span status
1746        if let Some(ref mut span) = ctx.otel_span {
1747            span.set_status(status);
1748            if let Some(ref upstream) = ctx.upstream {
1749                span.set_upstream(upstream, "");
1750            }
1751            if status >= 500 {
1752                span.record_error(&format!("HTTP {}", status));
1753            }
1754        }
1755
1756        // Record passive health check
1757        if let Some(ref upstream) = ctx.upstream {
1758            let success = status < 500;
1759
1760            trace!(
1761                correlation_id = %ctx.trace_id,
1762                upstream = %upstream,
1763                success = success,
1764                status = status,
1765                "Recording passive health check result"
1766            );
1767
1768            let error_msg = if !success {
1769                Some(format!("HTTP {}", status))
1770            } else {
1771                None
1772            };
1773            self.passive_health
1774                .record_outcome(upstream, success, error_msg.as_deref())
1775                .await;
1776
1777            // Report to upstream pool
1778            if let Some(pool) = self.upstream_pools.get(upstream).await {
1779                pool.report_result(upstream, success).await;
1780            }
1781
1782            if !success {
1783                warn!(
1784                    correlation_id = %ctx.trace_id,
1785                    upstream = %upstream,
1786                    status = status,
1787                    "Upstream returned error status"
1788                );
1789            }
1790        }
1791
1792        // Final request completion log
1793        if status >= 500 {
1794            error!(
1795                correlation_id = %ctx.trace_id,
1796                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1797                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1798                method = %ctx.method,
1799                path = %ctx.path,
1800                status = status,
1801                duration_ms = duration.as_millis(),
1802                attempts = ctx.upstream_attempts,
1803                "Request completed with server error"
1804            );
1805        } else if status >= 400 {
1806            warn!(
1807                correlation_id = %ctx.trace_id,
1808                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1809                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1810                method = %ctx.method,
1811                path = %ctx.path,
1812                status = status,
1813                duration_ms = duration.as_millis(),
1814                "Request completed with client error"
1815            );
1816        } else {
1817            debug!(
1818                correlation_id = %ctx.trace_id,
1819                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1820                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1821                method = %ctx.method,
1822                path = %ctx.path,
1823                status = status,
1824                duration_ms = duration.as_millis(),
1825                attempts = ctx.upstream_attempts,
1826                "Request completed"
1827            );
1828        }
1829
1830        Ok(())
1831    }
1832
1833    /// Modify the request before sending to upstream.
1834    /// Used for header modifications, adding authentication, etc.
1835    async fn upstream_request_filter(
1836        &self,
1837        _session: &mut Session,
1838        upstream_request: &mut pingora::http::RequestHeader,
1839        ctx: &mut Self::CTX,
1840    ) -> Result<()>
1841    where
1842        Self::CTX: Send + Sync,
1843    {
1844        trace!(
1845            correlation_id = %ctx.trace_id,
1846            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1847            "Applying upstream request modifications"
1848        );
1849
1850        // Add trace ID header for upstream correlation
1851        upstream_request
1852            .insert_header("X-Trace-Id", &ctx.trace_id)
1853            .ok();
1854
1855        // Add W3C traceparent header for distributed tracing
1856        if let Some(ref span) = ctx.otel_span {
1857            let sampled = ctx.trace_context.as_ref().map(|c| c.sampled).unwrap_or(true);
1858            let traceparent =
1859                crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled);
1860            upstream_request
1861                .insert_header(crate::otel::TRACEPARENT_HEADER, &traceparent)
1862                .ok();
1863        }
1864
1865        // Add request metadata headers
1866        upstream_request
1867            .insert_header("X-Forwarded-By", "Sentinel")
1868            .ok();
1869
1870        // Apply route-specific request header modifications
1871        // Clone the modifications to avoid lifetime issues with the header API
1872        if let Some(ref route_config) = ctx.route_config {
1873            let mods = route_config.policies.request_headers.clone();
1874
1875            // Set headers (overwrite existing)
1876            for (name, value) in mods.set {
1877                upstream_request.insert_header(name, value).ok();
1878            }
1879
1880            // Add headers (append)
1881            for (name, value) in mods.add {
1882                upstream_request.append_header(name, value).ok();
1883            }
1884
1885            // Remove headers
1886            for name in &mods.remove {
1887                upstream_request.remove_header(name);
1888            }
1889
1890            trace!(
1891                correlation_id = %ctx.trace_id,
1892                "Applied request header modifications"
1893            );
1894        }
1895
1896        // Remove sensitive headers that shouldn't go to upstream
1897        upstream_request.remove_header("X-Internal-Token");
1898        upstream_request.remove_header("Authorization-Internal");
1899
1900        // === Traffic Mirroring / Shadowing ===
1901        // Check if this route has shadow configuration
1902        if let Some(ref route_config) = ctx.route_config {
1903            if let Some(ref shadow_config) = route_config.shadow {
1904                // Get snapshot of upstream pools for shadow manager
1905                let pools_snapshot = self.upstream_pools.snapshot().await;
1906                let upstream_pools = std::sync::Arc::new(pools_snapshot);
1907
1908                // Get route ID for metrics labeling
1909                let route_id = ctx.route_id.clone().unwrap_or_else(|| "unknown".to_string());
1910
1911                // Create shadow manager
1912                let shadow_manager = crate::shadow::ShadowManager::new(
1913                    upstream_pools,
1914                    shadow_config.clone(),
1915                    Some(std::sync::Arc::clone(&self.metrics)),
1916                    route_id,
1917                );
1918
1919                // Check if we should shadow this request (sampling + header check)
1920                if shadow_manager.should_shadow(upstream_request) {
1921                    trace!(
1922                        correlation_id = %ctx.trace_id,
1923                        shadow_upstream = %shadow_config.upstream,
1924                        percentage = shadow_config.percentage,
1925                        "Shadowing request"
1926                    );
1927
1928                    // Clone headers for shadow request
1929                    let shadow_headers = upstream_request.clone();
1930
1931                    // Create request context for shadow (simplified from proxy context)
1932                    let shadow_ctx = crate::upstream::RequestContext {
1933                        client_ip: ctx.client_ip.parse().ok(),
1934                        headers: std::collections::HashMap::new(), // Empty for now
1935                        path: ctx.path.clone(),
1936                        method: ctx.method.clone(),
1937                    };
1938
1939                    // Determine if we should buffer the body
1940                    let buffer_body = shadow_config.buffer_body
1941                        && crate::shadow::should_buffer_method(&ctx.method);
1942
1943                    if buffer_body {
1944                        // Body buffering requested - defer shadow request until body is available
1945                        // Store shadow info in context; will be fired in logging phase
1946                        // or when request body filter completes
1947                        trace!(
1948                            correlation_id = %ctx.trace_id,
1949                            "Deferring shadow request until body is buffered"
1950                        );
1951                        ctx.shadow_pending = Some(crate::proxy::context::ShadowPendingRequest {
1952                            headers: shadow_headers,
1953                            manager: std::sync::Arc::new(shadow_manager),
1954                            request_ctx: shadow_ctx,
1955                            include_body: true,
1956                        });
1957                        // Enable body inspection to capture the body for shadow
1958                        // (only if not already enabled for other reasons)
1959                        if !ctx.body_inspection_enabled {
1960                            ctx.body_inspection_enabled = true;
1961                            // Set a reasonable buffer limit from shadow config
1962                            // (body_buffer will accumulate chunks)
1963                        }
1964                    } else {
1965                        // No body buffering needed - fire shadow request immediately
1966                        shadow_manager.shadow_request(shadow_headers, None, shadow_ctx);
1967                        ctx.shadow_sent = true;
1968                    }
1969                }
1970            }
1971        }
1972
1973        Ok(())
1974    }
1975
1976    /// Process response body chunks from upstream.
1977    /// Used for response size tracking and WAF inspection.
1978    ///
1979    /// Note: Response body inspection is currently buffered only (streaming mode not supported
1980    /// for responses due to Pingora's synchronous filter design).
1981    fn response_body_filter(
1982        &self,
1983        _session: &mut Session,
1984        body: &mut Option<Bytes>,
1985        end_of_stream: bool,
1986        ctx: &mut Self::CTX,
1987    ) -> Result<Option<Duration>, Box<Error>> {
1988        // Handle WebSocket frame inspection (server -> client)
1989        // Note: This filter is synchronous, so we use block_in_place for async agent calls
1990        if ctx.is_websocket_upgrade {
1991            if let Some(ref handler) = ctx.websocket_handler {
1992                let handler = handler.clone();
1993                let data = body.take();
1994
1995                // Use block_in_place to run async handler from sync context
1996                // This is safe because Pingora uses a multi-threaded tokio runtime
1997                let result = tokio::task::block_in_place(|| {
1998                    tokio::runtime::Handle::current()
1999                        .block_on(async { handler.process_server_data(data).await })
2000                });
2001
2002                match result {
2003                    crate::websocket::ProcessResult::Forward(data) => {
2004                        *body = data;
2005                    }
2006                    crate::websocket::ProcessResult::Close(reason) => {
2007                        warn!(
2008                            correlation_id = %ctx.trace_id,
2009                            code = reason.code,
2010                            reason = %reason.reason,
2011                            "WebSocket connection closed by agent (server->client)"
2012                        );
2013                        // For sync filter, we can't return an error that closes the connection
2014                        // Instead, inject a close frame
2015                        let close_frame =
2016                            crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
2017                        let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
2018                        if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
2019                            *body = Some(Bytes::from(encoded));
2020                        }
2021                    }
2022                }
2023            }
2024            // Skip normal body processing for WebSocket
2025            return Ok(None);
2026        }
2027
2028        // Track response body size
2029        if let Some(ref chunk) = body {
2030            ctx.response_bytes += chunk.len() as u64;
2031
2032            trace!(
2033                correlation_id = %ctx.trace_id,
2034                chunk_size = chunk.len(),
2035                total_response_bytes = ctx.response_bytes,
2036                end_of_stream = end_of_stream,
2037                "Processing response body chunk"
2038            );
2039
2040            // Process SSE chunks for streaming token counting
2041            if let Some(ref mut counter) = ctx.inference_streaming_counter {
2042                let result = counter.process_chunk(chunk);
2043
2044                if result.content.is_some() || result.is_done {
2045                    trace!(
2046                        correlation_id = %ctx.trace_id,
2047                        has_content = result.content.is_some(),
2048                        is_done = result.is_done,
2049                        chunks_processed = counter.chunks_processed(),
2050                        accumulated_content_len = counter.content().len(),
2051                        "Processed SSE chunk for token counting"
2052                    );
2053                }
2054            }
2055
2056            // Response body inspection (buffered mode only)
2057            // Note: Streaming mode for response bodies is not currently supported
2058            // due to Pingora's synchronous response_body_filter design
2059            if ctx.response_body_inspection_enabled
2060                && !ctx.response_body_inspection_agents.is_empty()
2061            {
2062                let config = ctx
2063                    .config
2064                    .get_or_insert_with(|| self.config_manager.current());
2065                let max_inspection_bytes = config
2066                    .waf
2067                    .as_ref()
2068                    .map(|w| w.body_inspection.max_inspection_bytes as u64)
2069                    .unwrap_or(1024 * 1024);
2070
2071                if ctx.response_body_bytes_inspected < max_inspection_bytes {
2072                    let bytes_to_inspect = std::cmp::min(
2073                        chunk.len() as u64,
2074                        max_inspection_bytes - ctx.response_body_bytes_inspected,
2075                    ) as usize;
2076
2077                    // Buffer for later processing (during logging phase)
2078                    // Response body inspection happens asynchronously and results
2079                    // are logged rather than blocking the response
2080                    ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
2081                    ctx.response_body_chunk_index += 1;
2082
2083                    trace!(
2084                        correlation_id = %ctx.trace_id,
2085                        bytes_inspected = ctx.response_body_bytes_inspected,
2086                        max_inspection_bytes = max_inspection_bytes,
2087                        chunk_index = ctx.response_body_chunk_index,
2088                        "Tracking response body for inspection"
2089                    );
2090                }
2091            }
2092        }
2093
2094        if end_of_stream {
2095            trace!(
2096                correlation_id = %ctx.trace_id,
2097                total_response_bytes = ctx.response_bytes,
2098                response_bytes_inspected = ctx.response_body_bytes_inspected,
2099                "Response body complete"
2100            );
2101        }
2102
2103        // Return None to indicate no delay needed
2104        Ok(None)
2105    }
2106
2107    /// Called when a connection to upstream is established or reused.
2108    /// Logs connection reuse statistics for observability.
2109    async fn connected_to_upstream(
2110        &self,
2111        _session: &mut Session,
2112        reused: bool,
2113        peer: &HttpPeer,
2114        #[cfg(unix)] _fd: RawFd,
2115        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
2116        digest: Option<&Digest>,
2117        ctx: &mut Self::CTX,
2118    ) -> Result<(), Box<Error>> {
2119        // Track connection reuse for metrics
2120        ctx.connection_reused = reused;
2121
2122        // Log connection establishment/reuse
2123        if reused {
2124            trace!(
2125                correlation_id = %ctx.trace_id,
2126                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2127                peer_address = %peer.address(),
2128                "Reusing existing upstream connection"
2129            );
2130        } else {
2131            debug!(
2132                correlation_id = %ctx.trace_id,
2133                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2134                peer_address = %peer.address(),
2135                ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
2136                "Established new upstream connection"
2137            );
2138        }
2139
2140        Ok(())
2141    }
2142
2143    // =========================================================================
2144    // HTTP Caching - Pingora Cache Integration
2145    // =========================================================================
2146
2147    /// Decide if the request should use caching.
2148    ///
2149    /// This method is called early in the request lifecycle to determine if
2150    /// the response should be served from cache or if the response should
2151    /// be cached.
2152    fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
2153        // Check if route has caching enabled
2154        let route_id = match ctx.route_id.as_deref() {
2155            Some(id) => id,
2156            None => {
2157                trace!(
2158                    correlation_id = %ctx.trace_id,
2159                    "Cache filter: no route ID, skipping cache"
2160                );
2161                return Ok(());
2162            }
2163        };
2164
2165        // Check if caching is enabled for this route
2166        if !self.cache_manager.is_enabled(route_id) {
2167            trace!(
2168                correlation_id = %ctx.trace_id,
2169                route_id = %route_id,
2170                "Cache disabled for route"
2171            );
2172            return Ok(());
2173        }
2174
2175        // Check if method is cacheable (typically GET/HEAD)
2176        if !self
2177            .cache_manager
2178            .is_method_cacheable(route_id, &ctx.method)
2179        {
2180            trace!(
2181                correlation_id = %ctx.trace_id,
2182                route_id = %route_id,
2183                method = %ctx.method,
2184                "Method not cacheable"
2185            );
2186            return Ok(());
2187        }
2188
2189        // Enable caching for this request using Pingora's cache infrastructure
2190        debug!(
2191            correlation_id = %ctx.trace_id,
2192            route_id = %route_id,
2193            method = %ctx.method,
2194            path = %ctx.path,
2195            "Enabling HTTP caching for request"
2196        );
2197
2198        // Get static references to cache infrastructure
2199        let storage = get_cache_storage();
2200        let eviction = get_cache_eviction();
2201        let cache_lock = get_cache_lock();
2202
2203        // Enable the cache with storage, eviction, and lock
2204        session.cache.enable(
2205            storage,
2206            Some(eviction),
2207            None, // predictor - optional
2208            Some(cache_lock),
2209            None, // option overrides
2210        );
2211
2212        // Mark request as cache-eligible in context
2213        ctx.cache_eligible = true;
2214
2215        trace!(
2216            correlation_id = %ctx.trace_id,
2217            route_id = %route_id,
2218            cache_enabled = session.cache.enabled(),
2219            "Cache enabled for request"
2220        );
2221
2222        Ok(())
2223    }
2224
2225    /// Generate the cache key for this request.
2226    ///
2227    /// The cache key uniquely identifies the cached response. It typically
2228    /// includes the method, host, path, and potentially query parameters.
2229    fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
2230        let req_header = session.req_header();
2231        let method = req_header.method.as_str();
2232        let path = req_header.uri.path();
2233        let host = ctx.host.as_deref().unwrap_or("unknown");
2234        let query = req_header.uri.query();
2235
2236        // Generate cache key using our cache manager
2237        let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2238
2239        trace!(
2240            correlation_id = %ctx.trace_id,
2241            cache_key = %key_string,
2242            "Generated cache key"
2243        );
2244
2245        // Use Pingora's default cache key generator which handles
2246        // proper hashing and internal format
2247        Ok(CacheKey::default(req_header))
2248    }
2249
2250    /// Called when a cache miss occurs.
2251    ///
2252    /// This is called when the cache lookup found no matching entry.
2253    /// We can use this to log and track cache misses.
2254    fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
2255        // Let Pingora handle the cache miss
2256        session.cache.cache_miss();
2257
2258        // Track statistics
2259        if let Some(route_id) = ctx.route_id.as_deref() {
2260            self.cache_manager.stats().record_miss();
2261
2262            trace!(
2263                correlation_id = %ctx.trace_id,
2264                route_id = %route_id,
2265                path = %ctx.path,
2266                "Cache miss"
2267            );
2268        }
2269    }
2270
2271    /// Called after a successful cache lookup.
2272    ///
2273    /// This filter allows inspecting the cached response before serving it.
2274    /// Returns `None` to serve the cached response, or a `ForcedInvalidationKind`
2275    /// to invalidate and refetch.
2276    async fn cache_hit_filter(
2277        &self,
2278        session: &mut Session,
2279        meta: &CacheMeta,
2280        _hit_handler: &mut HitHandler,
2281        is_fresh: bool,
2282        ctx: &mut Self::CTX,
2283    ) -> Result<Option<ForcedInvalidationKind>>
2284    where
2285        Self::CTX: Send + Sync,
2286    {
2287        // Check if this cache entry should be invalidated due to a purge request
2288        let req_header = session.req_header();
2289        let method = req_header.method.as_str();
2290        let path = req_header.uri.path();
2291        let host = req_header.uri.host().unwrap_or("localhost");
2292        let query = req_header.uri.query();
2293
2294        // Generate the cache key for this request
2295        let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2296
2297        // Check if this key should be invalidated
2298        if self.cache_manager.should_invalidate(&cache_key) {
2299            info!(
2300                correlation_id = %ctx.trace_id,
2301                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2302                cache_key = %cache_key,
2303                "Cache entry invalidated by purge request"
2304            );
2305            // Force expiration so the entry is refetched from upstream
2306            return Ok(Some(ForcedInvalidationKind::ForceExpired));
2307        }
2308
2309        // Track cache hit statistics
2310        if is_fresh {
2311            self.cache_manager.stats().record_hit();
2312
2313            debug!(
2314                correlation_id = %ctx.trace_id,
2315                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2316                is_fresh = is_fresh,
2317                "Cache hit (fresh)"
2318            );
2319        } else {
2320            trace!(
2321                correlation_id = %ctx.trace_id,
2322                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2323                is_fresh = is_fresh,
2324                "Cache hit (stale)"
2325            );
2326        }
2327
2328        // Serve the cached response without invalidation
2329        Ok(None)
2330    }
2331
2332    /// Decide if the response should be cached.
2333    ///
2334    /// Called after receiving the response from upstream to determine
2335    /// if it should be stored in the cache.
2336    fn response_cache_filter(
2337        &self,
2338        _session: &Session,
2339        resp: &ResponseHeader,
2340        ctx: &mut Self::CTX,
2341    ) -> Result<RespCacheable> {
2342        let route_id = match ctx.route_id.as_deref() {
2343            Some(id) => id,
2344            None => {
2345                return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2346                    "no_route",
2347                )));
2348            }
2349        };
2350
2351        // Check if caching is enabled for this route
2352        if !self.cache_manager.is_enabled(route_id) {
2353            return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2354                "disabled",
2355            )));
2356        }
2357
2358        let status = resp.status.as_u16();
2359
2360        // Check if status code is cacheable
2361        if !self.cache_manager.is_status_cacheable(route_id, status) {
2362            trace!(
2363                correlation_id = %ctx.trace_id,
2364                route_id = %route_id,
2365                status = status,
2366                "Status code not cacheable"
2367            );
2368            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2369        }
2370
2371        // Check Cache-Control header for no-store, no-cache, private
2372        if let Some(cache_control) = resp.headers.get("cache-control") {
2373            if let Ok(cc_str) = cache_control.to_str() {
2374                if crate::cache::CacheManager::is_no_cache(cc_str) {
2375                    trace!(
2376                        correlation_id = %ctx.trace_id,
2377                        route_id = %route_id,
2378                        cache_control = %cc_str,
2379                        "Response has no-cache directive"
2380                    );
2381                    return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2382                }
2383            }
2384        }
2385
2386        // Calculate TTL from Cache-Control or use default
2387        let cache_control = resp
2388            .headers
2389            .get("cache-control")
2390            .and_then(|v| v.to_str().ok());
2391        let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
2392
2393        if ttl.is_zero() {
2394            trace!(
2395                correlation_id = %ctx.trace_id,
2396                route_id = %route_id,
2397                "TTL is zero, not caching"
2398            );
2399            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2400        }
2401
2402        // Get route cache config for stale settings
2403        let config = self
2404            .cache_manager
2405            .get_route_config(route_id)
2406            .unwrap_or_default();
2407
2408        // Create timestamps for cache metadata
2409        let now = std::time::SystemTime::now();
2410        let fresh_until = now + ttl;
2411
2412        // Clone the response header for storage
2413        let header = resp.clone();
2414
2415        // Create CacheMeta with proper timestamps and TTLs
2416        let cache_meta = CacheMeta::new(
2417            fresh_until,
2418            now,
2419            config.stale_while_revalidate_secs as u32,
2420            config.stale_if_error_secs as u32,
2421            header,
2422        );
2423
2424        // Track the cache store
2425        self.cache_manager.stats().record_store();
2426
2427        debug!(
2428            correlation_id = %ctx.trace_id,
2429            route_id = %route_id,
2430            status = status,
2431            ttl_secs = ttl.as_secs(),
2432            stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2433            stale_if_error_secs = config.stale_if_error_secs,
2434            "Caching response"
2435        );
2436
2437        Ok(RespCacheable::Cacheable(cache_meta))
2438    }
2439
2440    /// Decide whether to serve stale content on error or during revalidation.
2441    ///
2442    /// This implements stale-while-revalidate and stale-if-error semantics.
2443    fn should_serve_stale(
2444        &self,
2445        _session: &mut Session,
2446        ctx: &mut Self::CTX,
2447        error: Option<&Error>,
2448    ) -> bool {
2449        let route_id = match ctx.route_id.as_deref() {
2450            Some(id) => id,
2451            None => return false,
2452        };
2453
2454        // Get route cache config for stale settings
2455        let config = match self.cache_manager.get_route_config(route_id) {
2456            Some(c) => c,
2457            None => return false,
2458        };
2459
2460        // If there's an upstream error, use stale-if-error
2461        if let Some(e) = error {
2462            // Only serve stale for upstream errors
2463            if e.esource() == &pingora::ErrorSource::Upstream {
2464                debug!(
2465                    correlation_id = %ctx.trace_id,
2466                    route_id = %route_id,
2467                    error = %e,
2468                    stale_if_error_secs = config.stale_if_error_secs,
2469                    "Considering stale-if-error"
2470                );
2471                return config.stale_if_error_secs > 0;
2472            }
2473        }
2474
2475        // During stale-while-revalidate (error is None)
2476        if error.is_none() && config.stale_while_revalidate_secs > 0 {
2477            trace!(
2478                correlation_id = %ctx.trace_id,
2479                route_id = %route_id,
2480                stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2481                "Allowing stale-while-revalidate"
2482            );
2483            return true;
2484        }
2485
2486        false
2487    }
2488
2489    /// Handle Range header for byte-range requests (streaming support).
2490    ///
2491    /// This method is called when a Range header is present in the request.
2492    /// It allows proper handling of:
2493    /// - Video streaming (HTML5 video seeking)
2494    /// - Large file downloads with resume support
2495    /// - Partial content delivery
2496    ///
2497    /// Uses Pingora's built-in range handling with route-specific logging.
2498    fn range_header_filter(
2499        &self,
2500        session: &mut Session,
2501        response: &mut ResponseHeader,
2502        ctx: &mut Self::CTX,
2503    ) -> pingora_proxy::RangeType
2504    where
2505        Self::CTX: Send + Sync,
2506    {
2507        // Check if route supports range requests
2508        let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
2509            // Static file routes and media routes should support range requests
2510            matches!(
2511                config.service_type,
2512                sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
2513            )
2514        });
2515
2516        if !supports_range {
2517            trace!(
2518                correlation_id = %ctx.trace_id,
2519                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2520                "Range request not supported for this route type"
2521            );
2522            return pingora_proxy::RangeType::None;
2523        }
2524
2525        // Use Pingora's built-in range header parsing and handling
2526        let range_type = pingora_proxy::range_header_filter(session.req_header(), response);
2527
2528        match &range_type {
2529            pingora_proxy::RangeType::None => {
2530                trace!(
2531                    correlation_id = %ctx.trace_id,
2532                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2533                    "No range request or not applicable"
2534                );
2535            }
2536            pingora_proxy::RangeType::Single(range) => {
2537                trace!(
2538                    correlation_id = %ctx.trace_id,
2539                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2540                    range_start = range.start,
2541                    range_end = range.end,
2542                    "Processing single-range request"
2543                );
2544            }
2545            pingora_proxy::RangeType::Multi(multi) => {
2546                trace!(
2547                    correlation_id = %ctx.trace_id,
2548                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2549                    range_count = multi.ranges.len(),
2550                    "Processing multi-range request"
2551                );
2552            }
2553            pingora_proxy::RangeType::Invalid => {
2554                debug!(
2555                    correlation_id = %ctx.trace_id,
2556                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2557                    "Invalid range header"
2558                );
2559            }
2560        }
2561
2562        range_type
2563    }
2564
2565    /// Handle fatal proxy errors by generating custom error pages.
2566    /// Called when the proxy itself fails to process the request.
2567    async fn fail_to_proxy(
2568        &self,
2569        session: &mut Session,
2570        e: &Error,
2571        ctx: &mut Self::CTX,
2572    ) -> pingora_proxy::FailToProxy
2573    where
2574        Self::CTX: Send + Sync,
2575    {
2576        let error_code = match e.etype() {
2577            // Connection errors
2578            ErrorType::ConnectRefused => 503,
2579            ErrorType::ConnectTimedout => 504,
2580            ErrorType::ConnectNoRoute => 502,
2581
2582            // Timeout errors
2583            ErrorType::ReadTimedout => 504,
2584            ErrorType::WriteTimedout => 504,
2585
2586            // TLS errors
2587            ErrorType::TLSHandshakeFailure => 502,
2588            ErrorType::InvalidCert => 502,
2589
2590            // Protocol errors
2591            ErrorType::InvalidHTTPHeader => 400,
2592            ErrorType::H2Error => 502,
2593
2594            // Resource errors
2595            ErrorType::ConnectProxyFailure => 502,
2596            ErrorType::ConnectionClosed => 502,
2597
2598            // Explicit HTTP status (e.g., from agent fail-closed blocking)
2599            ErrorType::HTTPStatus(status) => *status,
2600
2601            // Internal errors - return 502 for upstream issues (more accurate than 500)
2602            ErrorType::InternalError => {
2603                // Check if this is an upstream-related error
2604                let error_str = e.to_string();
2605                if error_str.contains("upstream")
2606                    || error_str.contains("DNS")
2607                    || error_str.contains("resolve")
2608                {
2609                    502
2610                } else {
2611                    500
2612                }
2613            }
2614
2615            // Default to 502 for unknown errors
2616            _ => 502,
2617        };
2618
2619        error!(
2620            correlation_id = %ctx.trace_id,
2621            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2622            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2623            error_type = ?e.etype(),
2624            error = %e,
2625            error_code = error_code,
2626            "Proxy error occurred"
2627        );
2628
2629        // Record the error in metrics
2630        self.metrics
2631            .record_blocked_request(&format!("proxy_error_{}", error_code));
2632
2633        // Write error response to ensure client receives a proper HTTP response
2634        // This is necessary because some errors occur before the upstream connection
2635        // is established, and Pingora may not send a response automatically
2636        let error_message = match error_code {
2637            400 => "Bad Request",
2638            502 => "Bad Gateway",
2639            503 => "Service Unavailable",
2640            504 => "Gateway Timeout",
2641            _ => "Internal Server Error",
2642        };
2643
2644        // Build a minimal error response body
2645        let body = format!(
2646            r#"{{"error":"{} {}","trace_id":"{}"}}"#,
2647            error_code, error_message, ctx.trace_id
2648        );
2649
2650        // Write the response header
2651        let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
2652        header
2653            .insert_header("Content-Type", "application/json")
2654            .ok();
2655        header
2656            .insert_header("Content-Length", body.len().to_string())
2657            .ok();
2658        header
2659            .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
2660            .ok();
2661        header.insert_header("Connection", "close").ok();
2662
2663        // Write headers and body
2664        if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
2665            warn!(
2666                correlation_id = %ctx.trace_id,
2667                error = %write_err,
2668                "Failed to write error response header"
2669            );
2670        } else {
2671            // Write the body
2672            if let Err(write_err) = session
2673                .write_response_body(Some(bytes::Bytes::from(body)), true)
2674                .await
2675            {
2676                warn!(
2677                    correlation_id = %ctx.trace_id,
2678                    error = %write_err,
2679                    "Failed to write error response body"
2680                );
2681            }
2682        }
2683
2684        // Return the error response info
2685        // can_reuse_downstream: false since we already wrote and closed the response
2686        pingora_proxy::FailToProxy {
2687            error_code,
2688            can_reuse_downstream: false,
2689        }
2690    }
2691
2692    /// Handle errors that occur during proxying after upstream connection is established.
2693    ///
2694    /// This method enables retry logic and circuit breaker integration.
2695    /// It's called when an error occurs during the request/response exchange
2696    /// with the upstream server.
2697    fn error_while_proxy(
2698        &self,
2699        peer: &HttpPeer,
2700        session: &mut Session,
2701        e: Box<Error>,
2702        ctx: &mut Self::CTX,
2703        client_reused: bool,
2704    ) -> Box<Error> {
2705        let error_type = e.etype().clone();
2706        let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
2707
2708        // Classify error for retry decisions
2709        let is_retryable = matches!(
2710            error_type,
2711            ErrorType::ConnectTimedout
2712                | ErrorType::ReadTimedout
2713                | ErrorType::WriteTimedout
2714                | ErrorType::ConnectionClosed
2715                | ErrorType::ConnectRefused
2716        );
2717
2718        // Log the error with context
2719        warn!(
2720            correlation_id = %ctx.trace_id,
2721            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2722            upstream = %upstream_id,
2723            peer_address = %peer.address(),
2724            error_type = ?error_type,
2725            error = %e,
2726            client_reused = client_reused,
2727            is_retryable = is_retryable,
2728            "Error during proxy operation"
2729        );
2730
2731        // Record failure with circuit breaker via upstream pool
2732        // This is done asynchronously since we can't await in a sync fn
2733        let peer_address = peer.address().to_string();
2734        let upstream_pools = self.upstream_pools.clone();
2735        let upstream_id_owned = upstream_id.to_string();
2736        tokio::spawn(async move {
2737            if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
2738                pool.report_result(&peer_address, false).await;
2739            }
2740        });
2741
2742        // Metrics tracking
2743        self.metrics
2744            .record_blocked_request(&format!("proxy_error_{:?}", error_type));
2745
2746        // Create enhanced error with retry information
2747        let mut enhanced_error = e.more_context(format!(
2748            "Upstream: {}, Peer: {}, Attempts: {}",
2749            upstream_id,
2750            peer.address(),
2751            ctx.upstream_attempts
2752        ));
2753
2754        // Determine if retry should be attempted:
2755        // - Only retry if error is retryable type
2756        // - Only retry reused connections if buffer isn't truncated
2757        // - Track retry metrics
2758        if is_retryable {
2759            let can_retry = if client_reused {
2760                // For reused connections, check if retry buffer is intact
2761                !session.as_ref().retry_buffer_truncated()
2762            } else {
2763                // Fresh connections can always retry
2764                true
2765            };
2766
2767            enhanced_error.retry.decide_reuse(can_retry);
2768
2769            if can_retry {
2770                debug!(
2771                    correlation_id = %ctx.trace_id,
2772                    upstream = %upstream_id,
2773                    error_type = ?error_type,
2774                    "Error is retryable, will attempt retry"
2775                );
2776            }
2777        } else {
2778            // Non-retryable error - don't retry
2779            enhanced_error.retry.decide_reuse(false);
2780        }
2781
2782        enhanced_error
2783    }
2784
2785    async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
2786        // Decrement active requests
2787        self.reload_coordinator.dec_requests();
2788
2789        // === Fire pending shadow request (if body buffering was enabled) ===
2790        if !ctx.shadow_sent {
2791            if let Some(shadow_pending) = ctx.shadow_pending.take() {
2792                let body = if shadow_pending.include_body && !ctx.body_buffer.is_empty() {
2793                    // Clone the buffered body for the shadow request
2794                    Some(ctx.body_buffer.clone())
2795                } else {
2796                    None
2797                };
2798
2799                trace!(
2800                    correlation_id = %ctx.trace_id,
2801                    body_size = body.as_ref().map(|b| b.len()).unwrap_or(0),
2802                    "Firing deferred shadow request with buffered body"
2803                );
2804
2805                shadow_pending.manager.shadow_request(
2806                    shadow_pending.headers,
2807                    body,
2808                    shadow_pending.request_ctx,
2809                );
2810                ctx.shadow_sent = true;
2811            }
2812        }
2813
2814        let duration = ctx.elapsed();
2815
2816        // Get response status
2817        let status = session
2818            .response_written()
2819            .map(|r| r.status.as_u16())
2820            .unwrap_or(0);
2821
2822        // Report result to load balancer for adaptive LB feedback
2823        // This enables latency-aware weight adjustment
2824        if let (Some(ref peer_addr), Some(ref upstream_id)) =
2825            (&ctx.selected_upstream_address, &ctx.upstream)
2826        {
2827            // Success = status code < 500 (client errors are not upstream failures)
2828            let success = status > 0 && status < 500;
2829
2830            if let Some(pool) = self.upstream_pools.get(upstream_id).await {
2831                pool.report_result_with_latency(peer_addr, success, Some(duration))
2832                    .await;
2833                trace!(
2834                    correlation_id = %ctx.trace_id,
2835                    upstream = %upstream_id,
2836                    peer_address = %peer_addr,
2837                    success = success,
2838                    duration_ms = duration.as_millis(),
2839                    status = status,
2840                    "Reported result to adaptive load balancer"
2841                );
2842            }
2843
2844            // Track warmth for inference routes (cold model detection)
2845            if ctx.inference_rate_limit_enabled && success {
2846                let cold_detected = self.warmth_tracker.record_request(peer_addr, duration);
2847                if cold_detected {
2848                    debug!(
2849                        correlation_id = %ctx.trace_id,
2850                        upstream = %upstream_id,
2851                        peer_address = %peer_addr,
2852                        duration_ms = duration.as_millis(),
2853                        "Cold model detected on inference upstream"
2854                    );
2855                }
2856            }
2857        }
2858
2859        // Record actual token usage for inference rate limiting
2860        // This adjusts the token bucket based on actual vs estimated tokens
2861        if ctx.inference_rate_limit_enabled {
2862            if let (Some(route_id), Some(ref rate_limit_key)) =
2863                (ctx.route_id.as_deref(), &ctx.inference_rate_limit_key)
2864            {
2865                // Try to extract actual tokens from response headers
2866                let response_headers = session
2867                    .response_written()
2868                    .map(|r| &r.headers)
2869                    .cloned()
2870                    .unwrap_or_default();
2871
2872                // For streaming responses, finalize the streaming token counter
2873                let streaming_result = if ctx.inference_streaming_response {
2874                    ctx.inference_streaming_counter
2875                        .as_ref()
2876                        .map(|counter| counter.finalize())
2877                } else {
2878                    None
2879                };
2880
2881                // Log streaming token count info
2882                if let Some(ref result) = streaming_result {
2883                    debug!(
2884                        correlation_id = %ctx.trace_id,
2885                        output_tokens = result.output_tokens,
2886                        input_tokens = ?result.input_tokens,
2887                        source = ?result.source,
2888                        content_length = result.content_length,
2889                        "Finalized streaming token count"
2890                    );
2891                }
2892
2893                // PII detection guardrail (for streaming inference responses)
2894                if ctx.inference_streaming_response {
2895                    if let Some(ref route_config) = ctx.route_config {
2896                        if let Some(ref inference) = route_config.inference {
2897                            if let Some(ref guardrails) = inference.guardrails {
2898                                if let Some(ref pii_config) = guardrails.pii_detection {
2899                                    if pii_config.enabled {
2900                                        // Get accumulated content from streaming counter
2901                                        if let Some(ref counter) = ctx.inference_streaming_counter {
2902                                            let response_content = counter.content();
2903                                            if !response_content.is_empty() {
2904                                                let pii_result = self
2905                                                    .guardrail_processor
2906                                                    .check_pii(
2907                                                        pii_config,
2908                                                        response_content,
2909                                                        ctx.route_id.as_deref(),
2910                                                        &ctx.trace_id,
2911                                                    )
2912                                                    .await;
2913
2914                                                match pii_result {
2915                                                    crate::inference::PiiCheckResult::Detected {
2916                                                        detections,
2917                                                        redacted_content: _,
2918                                                    } => {
2919                                                        warn!(
2920                                                            correlation_id = %ctx.trace_id,
2921                                                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2922                                                            detection_count = detections.len(),
2923                                                            "PII detected in inference response"
2924                                                        );
2925
2926                                                        // Store detection categories for logging
2927                                                        ctx.pii_detection_categories = detections
2928                                                            .iter()
2929                                                            .map(|d| d.category.clone())
2930                                                            .collect();
2931
2932                                                        // Record metrics for each category
2933                                                        for detection in &detections {
2934                                                            self.metrics.record_pii_detected(
2935                                                                ctx.route_id.as_deref().unwrap_or("unknown"),
2936                                                                &detection.category,
2937                                                            );
2938                                                        }
2939                                                    }
2940                                                    crate::inference::PiiCheckResult::Clean => {
2941                                                        trace!(
2942                                                            correlation_id = %ctx.trace_id,
2943                                                            "No PII detected in response"
2944                                                        );
2945                                                    }
2946                                                    crate::inference::PiiCheckResult::Error { message } => {
2947                                                        debug!(
2948                                                            correlation_id = %ctx.trace_id,
2949                                                            error = %message,
2950                                                            "PII detection check failed"
2951                                                        );
2952                                                    }
2953                                                }
2954                                            }
2955                                        }
2956                                    }
2957                                }
2958                            }
2959                        }
2960                    }
2961                }
2962
2963                // Response body would require buffering, which is expensive
2964                // For non-streaming, most LLM APIs provide token counts in headers
2965                // For streaming, we use the accumulated SSE content
2966                let empty_body: &[u8] = &[];
2967
2968                if let Some(actual_estimate) = self.inference_rate_limit_manager.record_actual(
2969                    route_id,
2970                    rate_limit_key,
2971                    &response_headers,
2972                    empty_body,
2973                    ctx.inference_estimated_tokens,
2974                ) {
2975                    // Use streaming result if available and header extraction failed
2976                    let (actual_tokens, source_info) = if let Some(ref streaming) = streaming_result {
2977                        // Prefer API-provided counts from streaming, otherwise use tiktoken count
2978                        if streaming.total_tokens.is_some() {
2979                            (streaming.total_tokens.unwrap(), "streaming_api")
2980                        } else if actual_estimate.source == crate::inference::TokenSource::Estimated {
2981                            // Header extraction failed, use streaming tiktoken count
2982                            // Estimate total by adding input estimate + output from streaming
2983                            let total = ctx.inference_input_tokens + streaming.output_tokens;
2984                            (total, "streaming_tiktoken")
2985                        } else {
2986                            (actual_estimate.tokens, "headers")
2987                        }
2988                    } else {
2989                        (actual_estimate.tokens, "headers")
2990                    };
2991
2992                    ctx.inference_actual_tokens = Some(actual_tokens);
2993
2994                    debug!(
2995                        correlation_id = %ctx.trace_id,
2996                        route_id = route_id,
2997                        estimated_tokens = ctx.inference_estimated_tokens,
2998                        actual_tokens = actual_tokens,
2999                        source = source_info,
3000                        streaming_response = ctx.inference_streaming_response,
3001                        model = ?ctx.inference_model,
3002                        "Recorded actual inference tokens"
3003                    );
3004
3005                    // Record budget usage with actual tokens (if budget tracking enabled)
3006                    if ctx.inference_budget_enabled {
3007                        let alerts = self.inference_rate_limit_manager.record_budget(
3008                            route_id,
3009                            rate_limit_key,
3010                            actual_tokens,
3011                        );
3012
3013                        // Log any budget alerts that fired
3014                        for alert in alerts.iter() {
3015                            warn!(
3016                                correlation_id = %ctx.trace_id,
3017                                route_id = route_id,
3018                                tenant = %alert.tenant,
3019                                threshold_pct = alert.threshold * 100.0,
3020                                tokens_used = alert.tokens_used,
3021                                tokens_limit = alert.tokens_limit,
3022                                "Token budget alert threshold crossed"
3023                            );
3024                        }
3025
3026                        // Update context with remaining budget
3027                        if let Some(status) = self.inference_rate_limit_manager.budget_status(
3028                            route_id,
3029                            rate_limit_key,
3030                        ) {
3031                            ctx.inference_budget_remaining = Some(status.tokens_remaining as i64);
3032                        }
3033                    }
3034
3035                    // Calculate cost if cost attribution is enabled
3036                    if ctx.inference_cost_enabled {
3037                        if let Some(model) = ctx.inference_model.as_deref() {
3038                            // Use streaming result for more accurate input/output split if available
3039                            let (input_tokens, output_tokens) = if let Some(ref streaming) = streaming_result {
3040                                // Streaming gives us accurate output tokens
3041                                let input = streaming.input_tokens.unwrap_or(ctx.inference_input_tokens);
3042                                let output = streaming.output_tokens;
3043                                (input, output)
3044                            } else {
3045                                // Fallback: estimate output from total - input
3046                                let input = ctx.inference_input_tokens;
3047                                let output = actual_tokens.saturating_sub(input);
3048                                (input, output)
3049                            };
3050                            ctx.inference_output_tokens = output_tokens;
3051
3052                            if let Some(cost_result) = self.inference_rate_limit_manager.calculate_cost(
3053                                route_id,
3054                                model,
3055                                input_tokens,
3056                                output_tokens,
3057                            ) {
3058                                ctx.inference_request_cost = Some(cost_result.total_cost);
3059
3060                                trace!(
3061                                    correlation_id = %ctx.trace_id,
3062                                    route_id = route_id,
3063                                    model = model,
3064                                    input_tokens = input_tokens,
3065                                    output_tokens = output_tokens,
3066                                    total_cost = cost_result.total_cost,
3067                                    currency = %cost_result.currency,
3068                                    "Calculated inference request cost"
3069                                );
3070                            }
3071                        }
3072                    }
3073                }
3074            }
3075        }
3076
3077        // Write to access log file if configured
3078        if self.log_manager.access_log_enabled() {
3079            let access_entry = AccessLogEntry {
3080                timestamp: chrono::Utc::now().to_rfc3339(),
3081                trace_id: ctx.trace_id.clone(),
3082                method: ctx.method.clone(),
3083                path: ctx.path.clone(),
3084                query: ctx.query.clone(),
3085                protocol: "HTTP/1.1".to_string(),
3086                status,
3087                body_bytes: ctx.response_bytes,
3088                duration_ms: duration.as_millis() as u64,
3089                client_ip: ctx.client_ip.clone(),
3090                user_agent: ctx.user_agent.clone(),
3091                referer: ctx.referer.clone(),
3092                host: ctx.host.clone(),
3093                route_id: ctx.route_id.clone(),
3094                upstream: ctx.upstream.clone(),
3095                upstream_attempts: ctx.upstream_attempts,
3096                instance_id: self.app_state.instance_id.clone(),
3097                namespace: ctx.namespace.clone(),
3098                service: ctx.service.clone(),
3099                // New fields
3100                body_bytes_sent: ctx.response_bytes,
3101                upstream_addr: ctx.selected_upstream_address.clone(),
3102                connection_reused: ctx.connection_reused,
3103                rate_limit_hit: status == 429,
3104                geo_country: ctx.geo_country_code.clone(),
3105            };
3106            self.log_manager.log_access(&access_entry);
3107        }
3108
3109        // Log to tracing at debug level (avoid allocations if debug disabled)
3110        if tracing::enabled!(tracing::Level::DEBUG) {
3111            debug!(
3112                trace_id = %ctx.trace_id,
3113                method = %ctx.method,
3114                path = %ctx.path,
3115                route_id = ?ctx.route_id,
3116                upstream = ?ctx.upstream,
3117                status = status,
3118                duration_ms = duration.as_millis() as u64,
3119                upstream_attempts = ctx.upstream_attempts,
3120                error = ?_error.map(|e| e.to_string()),
3121                "Request completed"
3122            );
3123        }
3124
3125        // Log WebSocket upgrades at info level
3126        if ctx.is_websocket_upgrade && status == 101 {
3127            info!(
3128                trace_id = %ctx.trace_id,
3129                route_id = ?ctx.route_id,
3130                upstream = ?ctx.upstream,
3131                client_ip = %ctx.client_ip,
3132                "WebSocket connection established"
3133            );
3134        }
3135
3136        // End OpenTelemetry span
3137        if let Some(span) = ctx.otel_span.take() {
3138            span.end();
3139        }
3140    }
3141}
3142
3143// =============================================================================
3144// Helper methods for body streaming (not part of ProxyHttp trait)
3145// =============================================================================
3146
3147impl SentinelProxy {
3148    /// Process a single body chunk in streaming mode.
3149    async fn process_body_chunk_streaming(
3150        &self,
3151        body: &mut Option<Bytes>,
3152        end_of_stream: bool,
3153        ctx: &mut RequestContext,
3154    ) -> Result<(), Box<Error>> {
3155        // Clone the chunk data to avoid borrowing issues when mutating body later
3156        let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
3157        let chunk_index = ctx.request_body_chunk_index;
3158        ctx.request_body_chunk_index += 1;
3159        ctx.body_bytes_inspected += chunk_data.len() as u64;
3160
3161        debug!(
3162            correlation_id = %ctx.trace_id,
3163            chunk_index = chunk_index,
3164            chunk_size = chunk_data.len(),
3165            end_of_stream = end_of_stream,
3166            "Streaming body chunk to agents"
3167        );
3168
3169        // Create agent call context
3170        let agent_ctx = crate::agents::AgentCallContext {
3171            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3172            metadata: sentinel_agent_protocol::RequestMetadata {
3173                correlation_id: ctx.trace_id.clone(),
3174                request_id: ctx.trace_id.clone(),
3175                client_ip: ctx.client_ip.clone(),
3176                client_port: 0,
3177                server_name: ctx.host.clone(),
3178                protocol: "HTTP/1.1".to_string(),
3179                tls_version: None,
3180                tls_cipher: None,
3181                route_id: ctx.route_id.clone(),
3182                upstream_id: ctx.upstream.clone(),
3183                timestamp: chrono::Utc::now().to_rfc3339(),
3184                traceparent: ctx.traceparent(),
3185            },
3186            route_id: ctx.route_id.clone(),
3187            upstream_id: ctx.upstream.clone(),
3188            request_body: None, // Not used in streaming mode
3189            response_body: None,
3190        };
3191
3192        let agent_ids = ctx.body_inspection_agents.clone();
3193        let total_size = None; // Unknown in streaming mode
3194
3195        match self
3196            .agent_manager
3197            .process_request_body_streaming(
3198                &agent_ctx,
3199                &chunk_data,
3200                end_of_stream,
3201                chunk_index,
3202                ctx.body_bytes_inspected as usize,
3203                total_size,
3204                &agent_ids,
3205            )
3206            .await
3207        {
3208            Ok(decision) => {
3209                // Track if agent needs more data
3210                ctx.agent_needs_more = decision.needs_more;
3211
3212                // Apply body mutation if present
3213                if let Some(ref mutation) = decision.request_body_mutation {
3214                    if !mutation.is_pass_through() {
3215                        if mutation.is_drop() {
3216                            // Drop the chunk
3217                            *body = None;
3218                            trace!(
3219                                correlation_id = %ctx.trace_id,
3220                                chunk_index = chunk_index,
3221                                "Agent dropped body chunk"
3222                            );
3223                        } else if let Some(ref new_data) = mutation.data {
3224                            // Replace chunk with mutated content
3225                            *body = Some(Bytes::from(new_data.clone()));
3226                            trace!(
3227                                correlation_id = %ctx.trace_id,
3228                                chunk_index = chunk_index,
3229                                original_size = chunk_data.len(),
3230                                new_size = new_data.len(),
3231                                "Agent mutated body chunk"
3232                            );
3233                        }
3234                    }
3235                }
3236
3237                // Check decision (only final if needs_more is false)
3238                if !decision.needs_more && !decision.is_allow() {
3239                    warn!(
3240                        correlation_id = %ctx.trace_id,
3241                        action = ?decision.action,
3242                        "Agent blocked request body"
3243                    );
3244                    self.metrics.record_blocked_request("agent_body_inspection");
3245
3246                    let (status, message) = match &decision.action {
3247                        crate::agents::AgentAction::Block { status, body, .. } => (
3248                            *status,
3249                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
3250                        ),
3251                        _ => (403, "Forbidden".to_string()),
3252                    };
3253
3254                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3255                }
3256
3257                trace!(
3258                    correlation_id = %ctx.trace_id,
3259                    needs_more = decision.needs_more,
3260                    "Agent processed body chunk"
3261                );
3262            }
3263            Err(e) => {
3264                let fail_closed = ctx
3265                    .route_config
3266                    .as_ref()
3267                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3268                    .unwrap_or(false);
3269
3270                if fail_closed {
3271                    error!(
3272                        correlation_id = %ctx.trace_id,
3273                        error = %e,
3274                        "Agent streaming body inspection failed, blocking (fail-closed)"
3275                    );
3276                    return Err(Error::explain(
3277                        ErrorType::HTTPStatus(503),
3278                        "Service unavailable",
3279                    ));
3280                } else {
3281                    warn!(
3282                        correlation_id = %ctx.trace_id,
3283                        error = %e,
3284                        "Agent streaming body inspection failed, allowing (fail-open)"
3285                    );
3286                }
3287            }
3288        }
3289
3290        Ok(())
3291    }
3292
3293    /// Send buffered body to agents (buffer mode).
3294    async fn send_buffered_body_to_agents(
3295        &self,
3296        end_of_stream: bool,
3297        ctx: &mut RequestContext,
3298    ) -> Result<(), Box<Error>> {
3299        debug!(
3300            correlation_id = %ctx.trace_id,
3301            buffer_size = ctx.body_buffer.len(),
3302            end_of_stream = end_of_stream,
3303            agent_count = ctx.body_inspection_agents.len(),
3304            decompression_enabled = ctx.decompression_enabled,
3305            "Sending buffered body to agents for inspection"
3306        );
3307
3308        // Decompress body if enabled and we have a supported encoding
3309        let body_for_inspection = if ctx.decompression_enabled {
3310            if let Some(ref encoding) = ctx.body_content_encoding {
3311                let config = crate::decompression::DecompressionConfig {
3312                    max_ratio: ctx.max_decompression_ratio,
3313                    max_output_bytes: ctx.max_decompression_bytes,
3314                };
3315
3316                match crate::decompression::decompress_body(
3317                    &ctx.body_buffer,
3318                    encoding,
3319                    &config,
3320                ) {
3321                    Ok(result) => {
3322                        ctx.body_was_decompressed = true;
3323                        self.metrics
3324                            .record_decompression_success(encoding, result.ratio);
3325                        debug!(
3326                            correlation_id = %ctx.trace_id,
3327                            encoding = %encoding,
3328                            compressed_size = result.compressed_size,
3329                            decompressed_size = result.decompressed_size,
3330                            ratio = result.ratio,
3331                            "Body decompressed for agent inspection"
3332                        );
3333                        result.data
3334                    }
3335                    Err(e) => {
3336                        // Record failure metric
3337                        let failure_reason = match &e {
3338                            crate::decompression::DecompressionError::RatioExceeded { .. } => {
3339                                "ratio_exceeded"
3340                            }
3341                            crate::decompression::DecompressionError::SizeExceeded { .. } => {
3342                                "size_exceeded"
3343                            }
3344                            crate::decompression::DecompressionError::InvalidData { .. } => {
3345                                "invalid_data"
3346                            }
3347                            crate::decompression::DecompressionError::UnsupportedEncoding { .. } => {
3348                                "unsupported"
3349                            }
3350                            crate::decompression::DecompressionError::IoError(_) => "io_error",
3351                        };
3352                        self.metrics
3353                            .record_decompression_failure(encoding, failure_reason);
3354
3355                        // Decompression failed - decide based on failure mode
3356                        let fail_closed = ctx
3357                            .route_config
3358                            .as_ref()
3359                            .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3360                            .unwrap_or(false);
3361
3362                        if fail_closed {
3363                            error!(
3364                                correlation_id = %ctx.trace_id,
3365                                error = %e,
3366                                encoding = %encoding,
3367                                "Decompression failed, blocking (fail-closed)"
3368                            );
3369                            return Err(Error::explain(
3370                                ErrorType::HTTPStatus(400),
3371                                "Invalid compressed body",
3372                            ));
3373                        } else {
3374                            warn!(
3375                                correlation_id = %ctx.trace_id,
3376                                error = %e,
3377                                encoding = %encoding,
3378                                "Decompression failed, sending compressed body (fail-open)"
3379                            );
3380                            ctx.body_buffer.clone()
3381                        }
3382                    }
3383                }
3384            } else {
3385                ctx.body_buffer.clone()
3386            }
3387        } else {
3388            ctx.body_buffer.clone()
3389        };
3390
3391        let agent_ctx = crate::agents::AgentCallContext {
3392            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
3393            metadata: sentinel_agent_protocol::RequestMetadata {
3394                correlation_id: ctx.trace_id.clone(),
3395                request_id: ctx.trace_id.clone(),
3396                client_ip: ctx.client_ip.clone(),
3397                client_port: 0,
3398                server_name: ctx.host.clone(),
3399                protocol: "HTTP/1.1".to_string(),
3400                tls_version: None,
3401                tls_cipher: None,
3402                route_id: ctx.route_id.clone(),
3403                upstream_id: ctx.upstream.clone(),
3404                timestamp: chrono::Utc::now().to_rfc3339(),
3405                traceparent: ctx.traceparent(),
3406            },
3407            route_id: ctx.route_id.clone(),
3408            upstream_id: ctx.upstream.clone(),
3409            request_body: Some(body_for_inspection.clone()),
3410            response_body: None,
3411        };
3412
3413        let agent_ids = ctx.body_inspection_agents.clone();
3414        match self
3415            .agent_manager
3416            .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
3417            .await
3418        {
3419            Ok(decision) => {
3420                if !decision.is_allow() {
3421                    warn!(
3422                        correlation_id = %ctx.trace_id,
3423                        action = ?decision.action,
3424                        "Agent blocked request body"
3425                    );
3426                    self.metrics.record_blocked_request("agent_body_inspection");
3427
3428                    let (status, message) = match &decision.action {
3429                        crate::agents::AgentAction::Block { status, body, .. } => (
3430                            *status,
3431                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
3432                        ),
3433                        _ => (403, "Forbidden".to_string()),
3434                    };
3435
3436                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3437                }
3438
3439                trace!(
3440                    correlation_id = %ctx.trace_id,
3441                    "Agent allowed request body"
3442                );
3443            }
3444            Err(e) => {
3445                let fail_closed = ctx
3446                    .route_config
3447                    .as_ref()
3448                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
3449                    .unwrap_or(false);
3450
3451                if fail_closed {
3452                    error!(
3453                        correlation_id = %ctx.trace_id,
3454                        error = %e,
3455                        "Agent body inspection failed, blocking (fail-closed)"
3456                    );
3457                    return Err(Error::explain(
3458                        ErrorType::HTTPStatus(503),
3459                        "Service unavailable",
3460                    ));
3461                } else {
3462                    warn!(
3463                        correlation_id = %ctx.trace_id,
3464                        error = %e,
3465                        "Agent body inspection failed, allowing (fail-open)"
3466                    );
3467                }
3468            }
3469        }
3470
3471        Ok(())
3472    }
3473}