Skip to main content

grapsus_proxy/proxy/
http_trait.rs

1//! ProxyHttp trait implementation for GrapsusProxy.
2//!
3//! This module contains the Pingora ProxyHttp trait implementation which defines
4//! the core request/response lifecycle handling.
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14    CacheKey, CacheMeta, ForcedFreshness, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::disk_cache::DiskHitHandler;
23use crate::hybrid_cache::HybridHitHandler;
24use crate::inference::{
25    extract_inference_content, is_sse_response, PromptInjectionResult, StreamingTokenCounter,
26};
27use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
28use crate::rate_limit::HeaderAccessor;
29use crate::routing::RequestInfo;
30
31use super::context::{FallbackReason, RequestContext};
32use super::fallback::FallbackEvaluator;
33use super::fallback_metrics::get_fallback_metrics;
34use super::model_routing;
35use super::model_routing_metrics::get_model_routing_metrics;
36use super::GrapsusProxy;
37
38/// Helper type for rate limiting when we don't need header access
39struct NoHeaderAccessor;
40impl HeaderAccessor for NoHeaderAccessor {
41    fn get_header(&self, _name: &str) -> Option<String> {
42        None
43    }
44}
45
46#[async_trait]
47impl ProxyHttp for GrapsusProxy {
48    type CTX = RequestContext;
49
50    fn new_ctx(&self) -> Self::CTX {
51        RequestContext::new()
52    }
53
54    fn fail_to_connect(
55        &self,
56        _session: &mut Session,
57        peer: &HttpPeer,
58        ctx: &mut Self::CTX,
59        e: Box<Error>,
60    ) -> Box<Error> {
61        error!(
62            correlation_id = %ctx.trace_id,
63            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
64            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
65            peer_address = %peer.address(),
66            error = %e,
67            "Failed to connect to upstream peer"
68        );
69        self.log_manager.log_request_error(
70            "error",
71            "Failed to connect to upstream peer",
72            &ctx.trace_id,
73            ctx.route_id.as_deref(),
74            ctx.upstream.as_deref(),
75            Some(format!("peer={} error={}", peer.address(), e)),
76        );
77        // Custom error pages are handled in response_filter
78        e
79    }
80
81    /// Early request filter - runs before upstream selection
82    /// Used to handle builtin routes that don't need an upstream connection
83    async fn early_request_filter(
84        &self,
85        session: &mut Session,
86        ctx: &mut Self::CTX,
87    ) -> Result<(), Box<Error>> {
88        // Extract request info for routing
89        let req_header = session.req_header();
90        let method = req_header.method.as_str();
91        let path = req_header.uri.path();
92        let host = req_header
93            .headers
94            .get("host")
95            .and_then(|h| h.to_str().ok())
96            .unwrap_or("");
97
98        // Handle ACME HTTP-01 challenges before any other processing
99        if let Some(ref challenge_manager) = self.acme_challenges {
100            if let Some(token) = crate::acme::ChallengeManager::extract_token(path) {
101                if let Some(key_authorization) = challenge_manager.get_response(token) {
102                    debug!(
103                        token = %token,
104                        "Serving ACME HTTP-01 challenge response"
105                    );
106
107                    // Build response
108                    let mut resp = ResponseHeader::build(200, None)?;
109                    resp.insert_header("Content-Type", "text/plain")?;
110                    resp.insert_header("Content-Length", key_authorization.len().to_string())?;
111
112                    // Send response
113                    session.write_response_header(Box::new(resp), false).await?;
114                    session
115                        .write_response_body(Some(Bytes::from(key_authorization)), true)
116                        .await?;
117
118                    // Return error to signal request is complete
119                    return Err(Error::explain(
120                        ErrorType::InternalError,
121                        "ACME challenge served",
122                    ));
123                } else {
124                    // Token not found - could be a stale request or attack
125                    warn!(
126                        token = %token,
127                        "ACME challenge token not found"
128                    );
129                }
130            }
131        }
132
133        ctx.method = method.to_string();
134        ctx.path = path.to_string();
135        ctx.host = Some(host.to_string());
136
137        // Match route to determine service type
138        let route_match = {
139            let route_matcher = self.route_matcher.read();
140            let request_info = RequestInfo::new(method, path, host);
141            match route_matcher.match_request(&request_info) {
142                Some(m) => m,
143                None => return Ok(()), // No matching route, let upstream_peer handle it
144            }
145        };
146
147        ctx.trace_id = self.get_trace_id(session);
148        ctx.route_id = Some(route_match.route_id.to_string());
149        ctx.route_config = Some(route_match.config.clone());
150
151        // Parse incoming W3C trace context if present
152        if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
153            if let Ok(s) = traceparent.to_str() {
154                ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
155            }
156        }
157
158        // Start OpenTelemetry request span if tracing is enabled
159        if let Some(tracer) = crate::otel::get_tracer() {
160            ctx.otel_span = Some(tracer.start_span(method, path, ctx.trace_context.as_ref()));
161        }
162
163        // Check if this is a builtin handler route
164        if route_match.config.service_type == grapsus_config::ServiceType::Builtin {
165            trace!(
166                correlation_id = %ctx.trace_id,
167                route_id = %route_match.route_id,
168                builtin_handler = ?route_match.config.builtin_handler,
169                "Handling builtin route in early_request_filter"
170            );
171
172            // Handle the builtin route directly
173            let handled = self
174                .handle_builtin_route(session, ctx, &route_match)
175                .await?;
176
177            if handled {
178                // Return error to signal that request is complete (Pingora will not continue)
179                return Err(Error::explain(
180                    ErrorType::InternalError,
181                    "Builtin handler complete",
182                ));
183            }
184        }
185
186        Ok(())
187    }
188
189    async fn upstream_peer(
190        &self,
191        session: &mut Session,
192        ctx: &mut Self::CTX,
193    ) -> Result<Box<HttpPeer>, Box<Error>> {
194        // Track active request
195        self.reload_coordinator.inc_requests();
196
197        // Cache global config once per request (avoids repeated Arc clones)
198        if ctx.config.is_none() {
199            ctx.config = Some(self.config_manager.current());
200        }
201
202        // Cache client address for logging if not already set
203        if ctx.client_ip.is_empty() {
204            ctx.client_ip = session
205                .client_addr()
206                .map(|a| a.to_string())
207                .unwrap_or_else(|| "unknown".to_string());
208        }
209
210        let req_header = session.req_header();
211
212        // Cache request info for access logging if not already set
213        if ctx.method.is_empty() {
214            ctx.method = req_header.method.to_string();
215            ctx.path = req_header.uri.path().to_string();
216            ctx.query = req_header.uri.query().map(|q| q.to_string());
217            ctx.host = req_header
218                .headers
219                .get("host")
220                .and_then(|v| v.to_str().ok())
221                .map(|s| s.to_string());
222        }
223        ctx.user_agent = req_header
224            .headers
225            .get("user-agent")
226            .and_then(|v| v.to_str().ok())
227            .map(|s| s.to_string());
228        ctx.referer = req_header
229            .headers
230            .get("referer")
231            .and_then(|v| v.to_str().ok())
232            .map(|s| s.to_string());
233
234        trace!(
235            correlation_id = %ctx.trace_id,
236            client_ip = %ctx.client_ip,
237            "Request received, initializing context"
238        );
239
240        // Use cached route info if already set by early_request_filter
241        let route_match = if let Some(ref route_config) = ctx.route_config {
242            let route_id = ctx.route_id.as_deref().unwrap_or("");
243            crate::routing::RouteMatch {
244                route_id: grapsus_common::RouteId::new(route_id),
245                config: route_config.clone(),
246            }
247        } else {
248            // Match route using sync RwLock (scoped to ensure lock is released before async ops)
249            let (match_result, route_duration) = {
250                let route_matcher = self.route_matcher.read();
251                let host = ctx.host.as_deref().unwrap_or("");
252
253                // Build request info (zero-copy for common case)
254                let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
255
256                // Only build headers HashMap if any route needs header matching
257                if route_matcher.needs_headers() {
258                    request_info = request_info
259                        .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
260                }
261
262                // Only parse query params if any route needs query param matching
263                if route_matcher.needs_query_params() {
264                    request_info =
265                        request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
266                }
267
268                trace!(
269                    correlation_id = %ctx.trace_id,
270                    method = %request_info.method,
271                    path = %request_info.path,
272                    host = %request_info.host,
273                    "Built request info for route matching"
274                );
275
276                let route_start = std::time::Instant::now();
277                let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
278                    warn!(
279                        correlation_id = %ctx.trace_id,
280                        method = %request_info.method,
281                        path = %request_info.path,
282                        host = %request_info.host,
283                        "No matching route found for request"
284                    );
285                    self.log_manager.log_request_error(
286                        "warn",
287                        "No matching route found for request",
288                        &ctx.trace_id,
289                        None,
290                        None,
291                        Some(format!(
292                            "method={} path={} host={}",
293                            request_info.method, request_info.path, request_info.host
294                        )),
295                    );
296                    Error::explain(ErrorType::InternalError, "No matching route found")
297                })?;
298                let route_duration = route_start.elapsed();
299                // Lock is dropped here when block ends
300                (route_match, route_duration)
301            };
302
303            ctx.route_id = Some(match_result.route_id.to_string());
304            ctx.route_config = Some(match_result.config.clone());
305
306            // Set trace_id if not already set by early_request_filter
307            if ctx.trace_id.is_empty() {
308                ctx.trace_id = self.get_trace_id(session);
309
310                // Parse incoming W3C trace context if present
311                if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
312                    if let Ok(s) = traceparent.to_str() {
313                        ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
314                    }
315                }
316
317                // Start OpenTelemetry request span if tracing is enabled
318                if let Some(tracer) = crate::otel::get_tracer() {
319                    ctx.otel_span =
320                        Some(tracer.start_span(&ctx.method, &ctx.path, ctx.trace_context.as_ref()));
321                }
322            }
323
324            trace!(
325                correlation_id = %ctx.trace_id,
326                route_id = %match_result.route_id,
327                route_duration_us = route_duration.as_micros(),
328                service_type = ?match_result.config.service_type,
329                "Route matched"
330            );
331            match_result
332        };
333
334        // Check if this is a builtin handler route (no upstream needed)
335        if route_match.config.service_type == grapsus_config::ServiceType::Builtin {
336            trace!(
337                correlation_id = %ctx.trace_id,
338                route_id = %route_match.route_id,
339                builtin_handler = ?route_match.config.builtin_handler,
340                "Route type is builtin, skipping upstream"
341            );
342            // Mark as builtin route for later processing in request_filter
343            ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
344            // Return error to skip upstream connection for builtin routes
345            return Err(Error::explain(
346                ErrorType::InternalError,
347                "Builtin handler handled in request_filter",
348            ));
349        }
350
351        // Check if this is a static file route
352        if route_match.config.service_type == grapsus_config::ServiceType::Static {
353            trace!(
354                correlation_id = %ctx.trace_id,
355                route_id = %route_match.route_id,
356                "Route type is static, checking for static server"
357            );
358            // Static routes don't need an upstream
359            if self
360                .static_servers
361                .get(route_match.route_id.as_str())
362                .await
363                .is_some()
364            {
365                // Mark this as a static route for later processing
366                ctx.upstream = Some(format!("_static_{}", route_match.route_id));
367                info!(
368                    correlation_id = %ctx.trace_id,
369                    route_id = %route_match.route_id,
370                    path = %ctx.path,
371                    "Serving static file"
372                );
373                // Return error to avoid upstream connection for static routes
374                return Err(Error::explain(
375                    ErrorType::InternalError,
376                    "Static file serving handled in request_filter",
377                ));
378            }
379        }
380
381        // === Model-based routing (for inference routes) ===
382        // Check if model routing is configured and select upstream based on model
383        let mut model_routing_applied = false;
384        if let Some(ref inference) = route_match.config.inference {
385            if let Some(ref model_routing) = inference.model_routing {
386                // Try to extract model from headers (fast path - no body parsing needed)
387                let model = model_routing::extract_model_from_headers(&req_header.headers);
388
389                if let Some(ref model_name) = model {
390                    // Find upstream for this model
391                    if let Some(routing_result) =
392                        model_routing::find_upstream_for_model(model_routing, model_name)
393                    {
394                        debug!(
395                            correlation_id = %ctx.trace_id,
396                            route_id = %route_match.route_id,
397                            model = %model_name,
398                            upstream = %routing_result.upstream,
399                            is_default = routing_result.is_default,
400                            provider_override = ?routing_result.provider,
401                            "Model-based routing selected upstream"
402                        );
403
404                        ctx.record_model_routing(
405                            &routing_result.upstream,
406                            Some(model_name.clone()),
407                            routing_result.provider,
408                        );
409                        model_routing_applied = true;
410
411                        // Record metrics
412                        if let Some(metrics) = get_model_routing_metrics() {
413                            metrics.record_model_routed(
414                                route_match.route_id.as_str(),
415                                model_name,
416                                &routing_result.upstream,
417                            );
418                            if routing_result.is_default {
419                                metrics.record_default_upstream(route_match.route_id.as_str());
420                            }
421                            if let Some(provider) = routing_result.provider {
422                                metrics.record_provider_override(
423                                    route_match.route_id.as_str(),
424                                    &routing_result.upstream,
425                                    provider.as_str(),
426                                );
427                            }
428                        }
429                    }
430                } else if let Some(ref default_upstream) = model_routing.default_upstream {
431                    // No model in headers, use default upstream
432                    debug!(
433                        correlation_id = %ctx.trace_id,
434                        route_id = %route_match.route_id,
435                        upstream = %default_upstream,
436                        "Model-based routing using default upstream (no model header)"
437                    );
438                    ctx.record_model_routing(default_upstream, None, None);
439                    model_routing_applied = true;
440
441                    // Record metrics for no model header case
442                    if let Some(metrics) = get_model_routing_metrics() {
443                        metrics.record_no_model_header(route_match.route_id.as_str());
444                    }
445                }
446            }
447        }
448
449        // Regular route with upstream (if model routing didn't set it)
450        if !model_routing_applied {
451            if let Some(ref upstream) = route_match.config.upstream {
452                ctx.upstream = Some(upstream.clone());
453                trace!(
454                    correlation_id = %ctx.trace_id,
455                    route_id = %route_match.route_id,
456                    upstream = %upstream,
457                    "Upstream configured for route"
458                );
459            } else {
460                error!(
461                    correlation_id = %ctx.trace_id,
462                    route_id = %route_match.route_id,
463                    "Route has no upstream configured"
464                );
465                return Err(Error::explain(
466                    ErrorType::InternalError,
467                    format!(
468                        "Route '{}' has no upstream configured",
469                        route_match.route_id
470                    ),
471                ));
472            }
473        }
474
475        // === Fallback routing evaluation (pre-request) ===
476        // Check if fallback should be triggered due to health or budget conditions
477        if let Some(ref fallback_config) = route_match.config.fallback {
478            let upstream_name = ctx.upstream.as_ref().unwrap();
479
480            // Check if primary upstream is healthy
481            let is_healthy = if let Some(pool) = self.upstream_pools.get(upstream_name).await {
482                pool.has_healthy_targets().await
483            } else {
484                false // Pool not found, treat as unhealthy
485            };
486
487            // Check if budget is exhausted (for inference routes)
488            let is_budget_exhausted = ctx.inference_budget_exhausted;
489
490            // Get model name for model mapping (inference routes)
491            let current_model = ctx.inference_model.as_deref();
492
493            // Create fallback evaluator
494            let evaluator = FallbackEvaluator::new(
495                fallback_config,
496                ctx.tried_upstreams(),
497                ctx.fallback_attempt,
498            );
499
500            // Evaluate pre-request fallback conditions
501            if let Some(decision) = evaluator.should_fallback_before_request(
502                upstream_name,
503                is_healthy,
504                is_budget_exhausted,
505                current_model,
506            ) {
507                info!(
508                    correlation_id = %ctx.trace_id,
509                    route_id = %route_match.route_id,
510                    from_upstream = %upstream_name,
511                    to_upstream = %decision.next_upstream,
512                    reason = %decision.reason,
513                    fallback_attempt = ctx.fallback_attempt + 1,
514                    "Triggering fallback routing"
515                );
516
517                // Record fallback metrics
518                if let Some(metrics) = get_fallback_metrics() {
519                    metrics.record_fallback_attempt(
520                        route_match.route_id.as_str(),
521                        upstream_name,
522                        &decision.next_upstream,
523                        &decision.reason,
524                    );
525                }
526
527                // Record fallback in context
528                ctx.record_fallback(decision.reason, &decision.next_upstream);
529
530                // Apply model mapping if present
531                if let Some((original, mapped)) = decision.model_mapping {
532                    // Record model mapping metrics
533                    if let Some(metrics) = get_fallback_metrics() {
534                        metrics.record_model_mapping(
535                            route_match.route_id.as_str(),
536                            &original,
537                            &mapped,
538                        );
539                    }
540
541                    ctx.record_model_mapping(original, mapped);
542                    trace!(
543                        correlation_id = %ctx.trace_id,
544                        original_model = ?ctx.model_mapping_applied().map(|m| &m.0),
545                        mapped_model = ?ctx.model_mapping_applied().map(|m| &m.1),
546                        "Applied model mapping for fallback"
547                    );
548                }
549            }
550        }
551
552        debug!(
553            correlation_id = %ctx.trace_id,
554            route_id = %route_match.route_id,
555            upstream = ?ctx.upstream,
556            method = %req_header.method,
557            path = %req_header.uri.path(),
558            host = ctx.host.as_deref().unwrap_or("-"),
559            client_ip = %ctx.client_ip,
560            "Processing request"
561        );
562
563        // Get upstream pool (skip for static routes)
564        if ctx
565            .upstream
566            .as_ref()
567            .is_some_and(|u| u.starts_with("_static_"))
568        {
569            // Static routes are handled in request_filter, should not reach here
570            return Err(Error::explain(
571                ErrorType::InternalError,
572                "Static route should be handled in request_filter",
573            ));
574        }
575
576        let upstream_name = ctx
577            .upstream
578            .as_ref()
579            .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
580
581        trace!(
582            correlation_id = %ctx.trace_id,
583            upstream = %upstream_name,
584            "Looking up upstream pool"
585        );
586
587        let pool = self
588            .upstream_pools
589            .get(upstream_name)
590            .await
591            .ok_or_else(|| {
592                error!(
593                    correlation_id = %ctx.trace_id,
594                    upstream = %upstream_name,
595                    "Upstream pool not found"
596                );
597                self.log_manager.log_request_error(
598                    "error",
599                    "Upstream pool not found",
600                    &ctx.trace_id,
601                    ctx.route_id.as_deref(),
602                    Some(upstream_name),
603                    None,
604                );
605                Error::explain(
606                    ErrorType::InternalError,
607                    format!("Upstream pool '{}' not found", upstream_name),
608                )
609            })?;
610
611        // Select peer from pool with retries
612        let max_retries = route_match
613            .config
614            .retry_policy
615            .as_ref()
616            .map(|r| r.max_attempts)
617            .unwrap_or(1);
618
619        trace!(
620            correlation_id = %ctx.trace_id,
621            upstream = %upstream_name,
622            max_retries = max_retries,
623            "Starting upstream peer selection"
624        );
625
626        let mut last_error = None;
627        let selection_start = std::time::Instant::now();
628
629        for attempt in 1..=max_retries {
630            ctx.upstream_attempts = attempt;
631
632            trace!(
633                correlation_id = %ctx.trace_id,
634                upstream = %upstream_name,
635                attempt = attempt,
636                max_retries = max_retries,
637                "Attempting to select upstream peer"
638            );
639
640            match pool.select_peer_with_metadata(None).await {
641                Ok((mut peer, metadata)) => {
642                    let selection_duration = selection_start.elapsed();
643                    // Store selected peer address for feedback reporting in logging()
644                    let peer_addr = peer.address().to_string();
645                    ctx.selected_upstream_address = Some(peer_addr.clone());
646
647                    // Copy sticky session metadata to context for response_filter
648                    if metadata.contains_key("sticky_session_new") {
649                        ctx.sticky_session_new_assignment = true;
650                        ctx.sticky_session_set_cookie =
651                            metadata.get("sticky_set_cookie_header").cloned();
652                        ctx.sticky_target_index = metadata
653                            .get("sticky_target_index")
654                            .and_then(|s| s.parse().ok());
655
656                        trace!(
657                            correlation_id = %ctx.trace_id,
658                            sticky_target_index = ?ctx.sticky_target_index,
659                            "New sticky session assignment, will set cookie"
660                        );
661                    }
662
663                    debug!(
664                        correlation_id = %ctx.trace_id,
665                        upstream = %upstream_name,
666                        peer_address = %peer_addr,
667                        attempt = attempt,
668                        selection_duration_us = selection_duration.as_micros(),
669                        sticky_session_hit = metadata.contains_key("sticky_session_hit"),
670                        sticky_session_new = ctx.sticky_session_new_assignment,
671                        "Selected upstream peer"
672                    );
673                    // Apply per-route policy timeout (lowest priority)
674                    if let Some(ref rc) = ctx.route_config {
675                        if let Some(timeout_secs) = rc.policies.timeout_secs {
676                            peer.options.read_timeout = Some(Duration::from_secs(timeout_secs));
677                        }
678                    }
679
680                    // Apply filter timeout overrides (higher priority, overwrites policy)
681                    if let Some(connect_secs) = ctx.filter_connect_timeout_secs {
682                        peer.options.connection_timeout = Some(Duration::from_secs(connect_secs));
683                    }
684                    if let Some(upstream_secs) = ctx.filter_upstream_timeout_secs {
685                        peer.options.read_timeout = Some(Duration::from_secs(upstream_secs));
686                    }
687
688                    return Ok(Box::new(peer));
689                }
690                Err(e) => {
691                    warn!(
692                        correlation_id = %ctx.trace_id,
693                        upstream = %upstream_name,
694                        attempt = attempt,
695                        max_retries = max_retries,
696                        error = %e,
697                        "Failed to select upstream peer"
698                    );
699                    last_error = Some(e);
700
701                    if attempt < max_retries {
702                        // Exponential backoff (using pingora-timeout for efficiency)
703                        let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
704                        trace!(
705                            correlation_id = %ctx.trace_id,
706                            backoff_ms = backoff.as_millis(),
707                            "Backing off before retry"
708                        );
709                        sleep(backoff).await;
710                    }
711                }
712            }
713        }
714
715        let selection_duration = selection_start.elapsed();
716        error!(
717            correlation_id = %ctx.trace_id,
718            upstream = %upstream_name,
719            attempts = max_retries,
720            selection_duration_ms = selection_duration.as_millis(),
721            last_error = ?last_error,
722            "All upstream selection attempts failed"
723        );
724        self.log_manager.log_request_error(
725            "error",
726            "All upstream selection attempts failed",
727            &ctx.trace_id,
728            ctx.route_id.as_deref(),
729            Some(upstream_name),
730            Some(format!("attempts={} error={:?}", max_retries, last_error)),
731        );
732
733        // Record exhausted metric if fallback was used but all upstreams failed
734        if ctx.used_fallback() {
735            if let Some(metrics) = get_fallback_metrics() {
736                metrics.record_fallback_exhausted(ctx.route_id.as_deref().unwrap_or("unknown"));
737            }
738        }
739
740        Err(Error::explain(
741            ErrorType::InternalError,
742            format!("All upstream attempts failed: {:?}", last_error),
743        ))
744    }
745
746    async fn request_filter(
747        &self,
748        session: &mut Session,
749        ctx: &mut Self::CTX,
750    ) -> Result<bool, Box<Error>> {
751        trace!(
752            correlation_id = %ctx.trace_id,
753            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
754            "Starting request filter phase"
755        );
756
757        // Apply per-listener timeouts from config
758        if let Some(server_addr) = session.downstream_session.server_addr() {
759            let server_addr_str = server_addr.to_string();
760            let config = ctx
761                .config
762                .get_or_insert_with(|| self.config_manager.current());
763            for listener in &config.listeners {
764                if listener.address == server_addr_str {
765                    // Apply downstream read timeout
766                    session.downstream_session.set_read_timeout(Some(
767                        std::time::Duration::from_secs(listener.request_timeout_secs),
768                    ));
769                    // Store keepalive for response phase
770                    ctx.listener_keepalive_timeout_secs = Some(listener.keepalive_timeout_secs);
771                    break;
772                }
773            }
774        }
775
776        // Check rate limiting early (before other processing)
777        // Fast path: skip if no rate limiting is configured for this route
778        if let Some(route_id) = ctx.route_id.as_deref() {
779            if self.rate_limit_manager.has_route_limiter(route_id) {
780                let rate_result = self.rate_limit_manager.check(
781                    route_id,
782                    &ctx.client_ip,
783                    &ctx.path,
784                    Option::<&NoHeaderAccessor>::None,
785                );
786
787                // Store rate limit info for response headers (even if allowed)
788                if rate_result.limit > 0 {
789                    ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
790                        limit: rate_result.limit,
791                        remaining: rate_result.remaining,
792                        reset_at: rate_result.reset_at,
793                    });
794                }
795
796                if !rate_result.allowed {
797                    use grapsus_config::RateLimitAction;
798
799                    match rate_result.action {
800                        RateLimitAction::Reject => {
801                            warn!(
802                                correlation_id = %ctx.trace_id,
803                                route_id = route_id,
804                                client_ip = %ctx.client_ip,
805                                limiter = %rate_result.limiter,
806                                limit = rate_result.limit,
807                                remaining = rate_result.remaining,
808                                "Request rate limited"
809                            );
810                            self.metrics.record_blocked_request("rate_limited");
811
812                            // Audit log the rate limit
813                            let audit_entry = AuditLogEntry::rate_limited(
814                                &ctx.trace_id,
815                                &ctx.method,
816                                &ctx.path,
817                                &ctx.client_ip,
818                                &rate_result.limiter,
819                            )
820                            .with_route_id(route_id)
821                            .with_status_code(rate_result.status_code);
822                            self.log_manager.log_audit(&audit_entry);
823
824                            // Send rate limit response with headers
825                            let body = rate_result
826                                .message
827                                .unwrap_or_else(|| "Rate limit exceeded".to_string());
828
829                            // Build response with rate limit headers
830                            let retry_after = rate_result.reset_at.saturating_sub(
831                                std::time::SystemTime::now()
832                                    .duration_since(std::time::UNIX_EPOCH)
833                                    .unwrap_or_default()
834                                    .as_secs(),
835                            );
836                            crate::http_helpers::write_rate_limit_error(
837                                session,
838                                rate_result.status_code,
839                                &body,
840                                rate_result.limit,
841                                rate_result.remaining,
842                                rate_result.reset_at,
843                                retry_after,
844                            )
845                            .await?;
846                            return Ok(true); // Request complete, don't continue
847                        }
848                        RateLimitAction::LogOnly => {
849                            debug!(
850                                correlation_id = %ctx.trace_id,
851                                route_id = route_id,
852                                "Rate limit exceeded (log only mode)"
853                            );
854                            // Continue processing
855                        }
856                        RateLimitAction::Delay => {
857                            // Apply delay if suggested by rate limiter
858                            if let Some(delay_ms) = rate_result.suggested_delay_ms {
859                                // Cap delay at the configured maximum
860                                let actual_delay = delay_ms.min(rate_result.max_delay_ms);
861
862                                if actual_delay > 0 {
863                                    debug!(
864                                        correlation_id = %ctx.trace_id,
865                                        route_id = route_id,
866                                        suggested_delay_ms = delay_ms,
867                                        max_delay_ms = rate_result.max_delay_ms,
868                                        actual_delay_ms = actual_delay,
869                                        "Applying rate limit delay"
870                                    );
871
872                                    tokio::time::sleep(std::time::Duration::from_millis(
873                                        actual_delay,
874                                    ))
875                                    .await;
876                                }
877                            }
878                            // Continue processing after delay
879                        }
880                    }
881                }
882            }
883        }
884
885        // Inference rate limiting (token-based, for LLM/AI routes)
886        // This runs after regular rate limiting and checks service type
887        if let Some(route_id) = ctx.route_id.as_deref() {
888            if let Some(ref route_config) = ctx.route_config {
889                if route_config.service_type == grapsus_config::ServiceType::Inference
890                    && self.inference_rate_limit_manager.has_route(route_id)
891                {
892                    // For inference rate limiting, we need access to the request body
893                    // to estimate tokens. We'll use buffered body if available.
894                    let headers = &session.req_header().headers;
895
896                    // Try to get buffered body, or use empty (will estimate from headers only)
897                    let body = ctx.body_buffer.as_slice();
898
899                    // Use client IP as the rate limit key (could be enhanced to use API key header)
900                    let rate_limit_key = &ctx.client_ip;
901
902                    if let Some(check_result) = self.inference_rate_limit_manager.check(
903                        route_id,
904                        rate_limit_key,
905                        headers,
906                        body,
907                    ) {
908                        // Store inference rate limiting context for recording actual tokens later
909                        ctx.inference_rate_limit_enabled = true;
910                        ctx.inference_estimated_tokens = check_result.estimated_tokens;
911                        ctx.inference_rate_limit_key = Some(rate_limit_key.to_string());
912                        ctx.inference_model = check_result.model.clone();
913
914                        if !check_result.is_allowed() {
915                            let retry_after_ms = check_result.retry_after_ms();
916                            let retry_after_secs = retry_after_ms.div_ceil(1000);
917
918                            warn!(
919                                correlation_id = %ctx.trace_id,
920                                route_id = route_id,
921                                client_ip = %ctx.client_ip,
922                                estimated_tokens = check_result.estimated_tokens,
923                                model = ?check_result.model,
924                                retry_after_ms = retry_after_ms,
925                                "Inference rate limit exceeded (tokens)"
926                            );
927                            self.metrics
928                                .record_blocked_request("inference_rate_limited");
929
930                            // Audit log the token rate limit
931                            let audit_entry = AuditLogEntry::new(
932                                &ctx.trace_id,
933                                AuditEventType::RateLimitExceeded,
934                                &ctx.method,
935                                &ctx.path,
936                                &ctx.client_ip,
937                            )
938                            .with_route_id(route_id)
939                            .with_status_code(429)
940                            .with_reason(format!(
941                                "Token rate limit exceeded: estimated {} tokens, model={:?}",
942                                check_result.estimated_tokens, check_result.model
943                            ));
944                            self.log_manager.log_audit(&audit_entry);
945
946                            // Send 429 response with appropriate headers
947                            let body = "Token rate limit exceeded";
948                            let reset_at = std::time::SystemTime::now()
949                                .duration_since(std::time::UNIX_EPOCH)
950                                .unwrap_or_default()
951                                .as_secs()
952                                + retry_after_secs;
953
954                            // Use simplified error write for inference rate limit
955                            crate::http_helpers::write_rate_limit_error(
956                                session,
957                                429,
958                                body,
959                                0, // No request limit
960                                0, // No remaining
961                                reset_at,
962                                retry_after_secs,
963                            )
964                            .await?;
965                            return Ok(true); // Request complete, don't continue
966                        }
967
968                        trace!(
969                            correlation_id = %ctx.trace_id,
970                            route_id = route_id,
971                            estimated_tokens = check_result.estimated_tokens,
972                            model = ?check_result.model,
973                            "Inference rate limit check passed"
974                        );
975
976                        // Check budget tracking (cumulative per-period limits)
977                        if self.inference_rate_limit_manager.has_budget(route_id) {
978                            ctx.inference_budget_enabled = true;
979
980                            if let Some(budget_result) =
981                                self.inference_rate_limit_manager.check_budget(
982                                    route_id,
983                                    rate_limit_key,
984                                    check_result.estimated_tokens,
985                                )
986                            {
987                                if !budget_result.is_allowed() {
988                                    let retry_after_secs = budget_result.retry_after_secs();
989
990                                    warn!(
991                                        correlation_id = %ctx.trace_id,
992                                        route_id = route_id,
993                                        client_ip = %ctx.client_ip,
994                                        estimated_tokens = check_result.estimated_tokens,
995                                        retry_after_secs = retry_after_secs,
996                                        "Token budget exhausted"
997                                    );
998
999                                    ctx.inference_budget_exhausted = true;
1000                                    self.metrics.record_blocked_request("budget_exhausted");
1001
1002                                    // Audit log the budget exhaustion
1003                                    let audit_entry = AuditLogEntry::new(
1004                                        &ctx.trace_id,
1005                                        AuditEventType::RateLimitExceeded,
1006                                        &ctx.method,
1007                                        &ctx.path,
1008                                        &ctx.client_ip,
1009                                    )
1010                                    .with_route_id(route_id)
1011                                    .with_status_code(429)
1012                                    .with_reason("Token budget exhausted".to_string());
1013                                    self.log_manager.log_audit(&audit_entry);
1014
1015                                    // Send 429 response with budget headers
1016                                    let body = "Token budget exhausted";
1017                                    let reset_at = std::time::SystemTime::now()
1018                                        .duration_since(std::time::UNIX_EPOCH)
1019                                        .unwrap_or_default()
1020                                        .as_secs()
1021                                        + retry_after_secs;
1022
1023                                    crate::http_helpers::write_rate_limit_error(
1024                                        session,
1025                                        429,
1026                                        body,
1027                                        0,
1028                                        0,
1029                                        reset_at,
1030                                        retry_after_secs,
1031                                    )
1032                                    .await?;
1033                                    return Ok(true);
1034                                }
1035
1036                                // Capture budget status for response headers
1037                                let remaining = match &budget_result {
1038                                    grapsus_common::budget::BudgetCheckResult::Allowed {
1039                                        remaining,
1040                                    } => *remaining as i64,
1041                                    grapsus_common::budget::BudgetCheckResult::Soft {
1042                                        remaining,
1043                                        ..
1044                                    } => *remaining,
1045                                    _ => 0,
1046                                };
1047                                ctx.inference_budget_remaining = Some(remaining);
1048
1049                                // Get period reset time from budget status
1050                                if let Some(status) = self
1051                                    .inference_rate_limit_manager
1052                                    .budget_status(route_id, rate_limit_key)
1053                                {
1054                                    ctx.inference_budget_period_reset = Some(status.period_end);
1055                                }
1056
1057                                trace!(
1058                                    correlation_id = %ctx.trace_id,
1059                                    route_id = route_id,
1060                                    budget_remaining = remaining,
1061                                    "Token budget check passed"
1062                                );
1063                            }
1064                        }
1065
1066                        // Check if cost attribution is enabled
1067                        if self
1068                            .inference_rate_limit_manager
1069                            .has_cost_attribution(route_id)
1070                        {
1071                            ctx.inference_cost_enabled = true;
1072                        }
1073                    }
1074                }
1075            }
1076        }
1077
1078        // Prompt injection guardrail (for inference routes)
1079        if let Some(ref route_config) = ctx.route_config {
1080            if let Some(ref inference) = route_config.inference {
1081                if let Some(ref guardrails) = inference.guardrails {
1082                    if let Some(ref pi_config) = guardrails.prompt_injection {
1083                        if pi_config.enabled && !ctx.body_buffer.is_empty() {
1084                            ctx.guardrails_enabled = true;
1085
1086                            // Extract content from request body
1087                            if let Some(content) = extract_inference_content(&ctx.body_buffer) {
1088                                let result = self
1089                                    .guardrail_processor
1090                                    .check_prompt_injection(
1091                                        pi_config,
1092                                        &content,
1093                                        ctx.inference_model.as_deref(),
1094                                        ctx.route_id.as_deref(),
1095                                        &ctx.trace_id,
1096                                    )
1097                                    .await;
1098
1099                                match result {
1100                                    PromptInjectionResult::Blocked {
1101                                        status,
1102                                        message,
1103                                        detections,
1104                                    } => {
1105                                        warn!(
1106                                            correlation_id = %ctx.trace_id,
1107                                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1108                                            detection_count = detections.len(),
1109                                            "Prompt injection detected, blocking request"
1110                                        );
1111
1112                                        self.metrics.record_blocked_request("prompt_injection");
1113
1114                                        // Store detection categories for logging
1115                                        ctx.guardrail_detection_categories =
1116                                            detections.iter().map(|d| d.category.clone()).collect();
1117
1118                                        // Audit log the block
1119                                        let audit_entry = AuditLogEntry::new(
1120                                            &ctx.trace_id,
1121                                            AuditEventType::Blocked,
1122                                            &ctx.method,
1123                                            &ctx.path,
1124                                            &ctx.client_ip,
1125                                        )
1126                                        .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
1127                                        .with_status_code(status)
1128                                        .with_reason("Prompt injection detected".to_string());
1129                                        self.log_manager.log_audit(&audit_entry);
1130
1131                                        // Send error response
1132                                        crate::http_helpers::write_json_error(
1133                                            session,
1134                                            status,
1135                                            "prompt_injection_blocked",
1136                                            Some(&message),
1137                                        )
1138                                        .await?;
1139                                        return Ok(true);
1140                                    }
1141                                    PromptInjectionResult::Detected { detections } => {
1142                                        // Log but allow
1143                                        warn!(
1144                                            correlation_id = %ctx.trace_id,
1145                                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1146                                            detection_count = detections.len(),
1147                                            "Prompt injection detected (logged only)"
1148                                        );
1149                                        ctx.guardrail_detection_categories =
1150                                            detections.iter().map(|d| d.category.clone()).collect();
1151                                    }
1152                                    PromptInjectionResult::Warning { detections } => {
1153                                        // Set flag for response header
1154                                        ctx.guardrail_warning = true;
1155                                        ctx.guardrail_detection_categories =
1156                                            detections.iter().map(|d| d.category.clone()).collect();
1157                                        debug!(
1158                                            correlation_id = %ctx.trace_id,
1159                                            "Prompt injection warning set"
1160                                        );
1161                                    }
1162                                    PromptInjectionResult::Clean => {
1163                                        trace!(
1164                                            correlation_id = %ctx.trace_id,
1165                                            "No prompt injection detected"
1166                                        );
1167                                    }
1168                                    PromptInjectionResult::Error { message } => {
1169                                        // Already logged in processor, just trace here
1170                                        trace!(
1171                                            correlation_id = %ctx.trace_id,
1172                                            error = %message,
1173                                            "Prompt injection check error (failure mode applied)"
1174                                        );
1175                                    }
1176                                }
1177                            }
1178                        }
1179                    }
1180                }
1181            }
1182        }
1183
1184        // Geo filtering
1185        if let Some(route_id) = ctx.route_id.as_deref() {
1186            if let Some(ref route_config) = ctx.route_config {
1187                for filter_id in &route_config.filters {
1188                    if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
1189                        // Store country code for response header
1190                        ctx.geo_country_code = result.country_code.clone();
1191                        ctx.geo_lookup_performed = true;
1192
1193                        if !result.allowed {
1194                            warn!(
1195                                correlation_id = %ctx.trace_id,
1196                                route_id = route_id,
1197                                client_ip = %ctx.client_ip,
1198                                country = ?result.country_code,
1199                                filter_id = %filter_id,
1200                                "Request blocked by geo filter"
1201                            );
1202                            self.metrics.record_blocked_request("geo_blocked");
1203
1204                            // Audit log the geo block
1205                            let audit_entry = AuditLogEntry::new(
1206                                &ctx.trace_id,
1207                                AuditEventType::Blocked,
1208                                &ctx.method,
1209                                &ctx.path,
1210                                &ctx.client_ip,
1211                            )
1212                            .with_route_id(route_id)
1213                            .with_status_code(result.status_code)
1214                            .with_reason(format!(
1215                                "Geo blocked: country={}, filter={}",
1216                                result.country_code.as_deref().unwrap_or("unknown"),
1217                                filter_id
1218                            ));
1219                            self.log_manager.log_audit(&audit_entry);
1220
1221                            // Send geo block response
1222                            let body = result
1223                                .block_message
1224                                .unwrap_or_else(|| "Access denied".to_string());
1225
1226                            crate::http_helpers::write_error(
1227                                session,
1228                                result.status_code,
1229                                &body,
1230                                "text/plain",
1231                            )
1232                            .await?;
1233                            return Ok(true); // Request complete, don't continue
1234                        }
1235
1236                        // Only check first geo filter that matches
1237                        break;
1238                    }
1239                }
1240            }
1241        }
1242
1243        // Route-level filters (CORS preflight, Timeout, Log)
1244        // Clone the Arc to avoid borrow conflict between &Config and &mut ctx
1245        let config_for_filters = std::sync::Arc::clone(
1246            ctx.config
1247                .get_or_insert_with(|| self.config_manager.current()),
1248        );
1249        if super::filters::apply_request_filters(session, ctx, &config_for_filters).await? {
1250            return Ok(true); // Filter handled request (e.g. CORS preflight)
1251        }
1252
1253        // Check for WebSocket upgrade requests
1254        let is_websocket_upgrade = session
1255            .req_header()
1256            .headers
1257            .get(http::header::UPGRADE)
1258            .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
1259            .unwrap_or(false);
1260
1261        if is_websocket_upgrade {
1262            ctx.is_websocket_upgrade = true;
1263
1264            // Check if route allows WebSocket upgrades
1265            if let Some(ref route_config) = ctx.route_config {
1266                if !route_config.websocket {
1267                    warn!(
1268                        correlation_id = %ctx.trace_id,
1269                        route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1270                        client_ip = %ctx.client_ip,
1271                        "WebSocket upgrade rejected: not enabled for route"
1272                    );
1273
1274                    self.metrics.record_blocked_request("websocket_not_enabled");
1275
1276                    // Audit log the rejection
1277                    let audit_entry = AuditLogEntry::new(
1278                        &ctx.trace_id,
1279                        AuditEventType::Blocked,
1280                        &ctx.method,
1281                        &ctx.path,
1282                        &ctx.client_ip,
1283                    )
1284                    .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
1285                    .with_action("websocket_rejected")
1286                    .with_reason("WebSocket not enabled for route");
1287                    self.log_manager.log_audit(&audit_entry);
1288
1289                    // Send 403 Forbidden response
1290                    crate::http_helpers::write_error(
1291                        session,
1292                        403,
1293                        "WebSocket not enabled for this route",
1294                        "text/plain",
1295                    )
1296                    .await?;
1297                    return Ok(true); // Request complete, don't continue
1298                }
1299
1300                debug!(
1301                    correlation_id = %ctx.trace_id,
1302                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1303                    "WebSocket upgrade request allowed"
1304                );
1305
1306                // Check for WebSocket frame inspection
1307                if route_config.websocket_inspection {
1308                    // Check for compression negotiation - skip inspection if permessage-deflate
1309                    let has_compression = session
1310                        .req_header()
1311                        .headers
1312                        .get("Sec-WebSocket-Extensions")
1313                        .and_then(|v| v.to_str().ok())
1314                        .map(|s| s.contains("permessage-deflate"))
1315                        .unwrap_or(false);
1316
1317                    if has_compression {
1318                        debug!(
1319                            correlation_id = %ctx.trace_id,
1320                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1321                            "WebSocket inspection skipped: permessage-deflate negotiated"
1322                        );
1323                        ctx.websocket_skip_inspection = true;
1324                    } else {
1325                        ctx.websocket_inspection_enabled = true;
1326
1327                        // Get agents that handle WebSocketFrame events
1328                        ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
1329                            grapsus_agent_protocol::EventType::WebSocketFrame,
1330                        );
1331
1332                        debug!(
1333                            correlation_id = %ctx.trace_id,
1334                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1335                            agent_count = ctx.websocket_inspection_agents.len(),
1336                            "WebSocket frame inspection enabled"
1337                        );
1338                    }
1339                }
1340            }
1341        }
1342
1343        // Use cached route config from upstream_peer (avoids duplicate route matching)
1344        // Handle static file and builtin routes
1345        if let Some(route_config) = ctx.route_config.clone() {
1346            if route_config.service_type == grapsus_config::ServiceType::Static {
1347                trace!(
1348                    correlation_id = %ctx.trace_id,
1349                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1350                    "Handling static file route"
1351                );
1352                // Create a minimal RouteMatch for the handler
1353                let route_match = crate::routing::RouteMatch {
1354                    route_id: grapsus_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1355                    config: route_config.clone(),
1356                };
1357                return self.handle_static_route(session, ctx, &route_match).await;
1358            } else if route_config.service_type == grapsus_config::ServiceType::Builtin {
1359                trace!(
1360                    correlation_id = %ctx.trace_id,
1361                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1362                    builtin_handler = ?route_config.builtin_handler,
1363                    "Handling builtin route"
1364                );
1365                // Create a minimal RouteMatch for the handler
1366                let route_match = crate::routing::RouteMatch {
1367                    route_id: grapsus_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1368                    config: route_config.clone(),
1369                };
1370                return self.handle_builtin_route(session, ctx, &route_match).await;
1371            }
1372        }
1373
1374        // API validation for API routes
1375        if let Some(route_id) = ctx.route_id.clone() {
1376            if let Some(validator) = self.validators.get(&route_id).await {
1377                trace!(
1378                    correlation_id = %ctx.trace_id,
1379                    route_id = %route_id,
1380                    "Running API schema validation"
1381                );
1382                if let Some(result) = self
1383                    .validate_api_request(session, ctx, &route_id, &validator)
1384                    .await?
1385                {
1386                    debug!(
1387                        correlation_id = %ctx.trace_id,
1388                        route_id = %route_id,
1389                        validation_passed = result,
1390                        "API validation complete"
1391                    );
1392                    return Ok(result);
1393                }
1394            }
1395        }
1396
1397        // Get client address before mutable borrow
1398        let client_addr = session
1399            .client_addr()
1400            .map(|a| format!("{}", a))
1401            .unwrap_or_else(|| "unknown".to_string());
1402        let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
1403
1404        let req_header = session.req_header_mut();
1405
1406        // Add correlation ID header
1407        req_header
1408            .insert_header("X-Correlation-Id", &ctx.trace_id)
1409            .ok();
1410        req_header.insert_header("X-Forwarded-By", "Grapsus").ok();
1411
1412        // Use cached config (set in upstream_peer, or fetch now if needed)
1413        let config = ctx
1414            .config
1415            .get_or_insert_with(|| self.config_manager.current());
1416
1417        // Enforce header limits (fast path: skip if limits are very high)
1418        const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; // 1MB = effectively unlimited
1419
1420        // Header count check - O(1)
1421        let header_count = req_header.headers.len();
1422        if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
1423            && header_count > config.limits.max_header_count
1424        {
1425            warn!(
1426                correlation_id = %ctx.trace_id,
1427                header_count = header_count,
1428                limit = config.limits.max_header_count,
1429                "Request blocked: exceeds header count limit"
1430            );
1431
1432            self.metrics.record_blocked_request("header_count_exceeded");
1433            return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
1434        }
1435
1436        // Header size check - O(n), skip if limit is very high
1437        if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
1438            let total_header_size: usize = req_header
1439                .headers
1440                .iter()
1441                .map(|(k, v)| k.as_str().len() + v.len())
1442                .sum();
1443
1444            if total_header_size > config.limits.max_header_size_bytes {
1445                warn!(
1446                    correlation_id = %ctx.trace_id,
1447                    header_size = total_header_size,
1448                    limit = config.limits.max_header_size_bytes,
1449                    "Request blocked: exceeds header size limit"
1450                );
1451
1452                self.metrics.record_blocked_request("header_size_exceeded");
1453                return Err(Error::explain(
1454                    ErrorType::InternalError,
1455                    "Headers too large",
1456                ));
1457            }
1458        }
1459
1460        // Process through external agents
1461        trace!(
1462            correlation_id = %ctx.trace_id,
1463            "Processing request through agents"
1464        );
1465        if let Err(e) = self
1466            .process_agents(session, ctx, &client_addr, client_port)
1467            .await
1468        {
1469            // Check if this is an HTTPStatus error (e.g., agent block or fail-closed)
1470            // In that case, we need to send a proper HTTP response instead of just closing the connection
1471            if let ErrorType::HTTPStatus(status) = e.etype() {
1472                // Extract the message from the error (the context part after "HTTPStatus context:")
1473                let error_msg = e.to_string();
1474                let body = error_msg
1475                    .split("context:")
1476                    .nth(1)
1477                    .map(|s| s.trim())
1478                    .unwrap_or("Request blocked");
1479                debug!(
1480                    correlation_id = %ctx.trace_id,
1481                    status = status,
1482                    body = %body,
1483                    "Sending HTTP error response for agent block"
1484                );
1485                crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
1486                return Ok(true); // Request complete, don't continue to upstream
1487            }
1488            // For other errors, propagate them
1489            return Err(e);
1490        }
1491
1492        trace!(
1493            correlation_id = %ctx.trace_id,
1494            "Request filter phase complete, forwarding to upstream"
1495        );
1496
1497        Ok(false) // Continue processing
1498    }
1499
1500    /// Process incoming request body chunks.
1501    /// Used for body size enforcement and WAF/agent inspection.
1502    ///
1503    /// Supports two modes:
1504    /// - **Buffer mode** (default): Buffer chunks until end of stream or limit, then send to agents
1505    /// - **Stream mode**: Send each chunk immediately to agents as it arrives
1506    async fn request_body_filter(
1507        &self,
1508        _session: &mut Session,
1509        body: &mut Option<Bytes>,
1510        end_of_stream: bool,
1511        ctx: &mut Self::CTX,
1512    ) -> Result<(), Box<Error>> {
1513        use grapsus_config::BodyStreamingMode;
1514
1515        // Handle WebSocket frame inspection (client -> server)
1516        if ctx.is_websocket_upgrade {
1517            if let Some(ref handler) = ctx.websocket_handler {
1518                let result = handler.process_client_data(body.take()).await;
1519                match result {
1520                    crate::websocket::ProcessResult::Forward(data) => {
1521                        *body = data;
1522                    }
1523                    crate::websocket::ProcessResult::Close(reason) => {
1524                        warn!(
1525                            correlation_id = %ctx.trace_id,
1526                            code = reason.code,
1527                            reason = %reason.reason,
1528                            "WebSocket connection closed by agent (client->server)"
1529                        );
1530                        // Return an error to close the connection
1531                        return Err(Error::explain(
1532                            ErrorType::InternalError,
1533                            format!("WebSocket closed: {} {}", reason.code, reason.reason),
1534                        ));
1535                    }
1536                }
1537            }
1538            // Skip normal body processing for WebSocket
1539            return Ok(());
1540        }
1541
1542        // Track request body size
1543        let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
1544        if chunk_len > 0 {
1545            ctx.request_body_bytes += chunk_len as u64;
1546
1547            trace!(
1548                correlation_id = %ctx.trace_id,
1549                chunk_size = chunk_len,
1550                total_body_bytes = ctx.request_body_bytes,
1551                end_of_stream = end_of_stream,
1552                streaming_mode = ?ctx.request_body_streaming_mode,
1553                "Processing request body chunk"
1554            );
1555
1556            // Check body size limit (use cached config)
1557            let config = ctx
1558                .config
1559                .get_or_insert_with(|| self.config_manager.current());
1560            if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
1561                warn!(
1562                    correlation_id = %ctx.trace_id,
1563                    body_bytes = ctx.request_body_bytes,
1564                    limit = config.limits.max_body_size_bytes,
1565                    "Request body size limit exceeded"
1566                );
1567                self.metrics.record_blocked_request("body_size_exceeded");
1568                return Err(Error::explain(
1569                    ErrorType::InternalError,
1570                    "Request body too large",
1571                ));
1572            }
1573        }
1574
1575        // Body inspection for agents (WAF, etc.)
1576        if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
1577            let config = ctx
1578                .config
1579                .get_or_insert_with(|| self.config_manager.current());
1580            let max_inspection_bytes = config
1581                .waf
1582                .as_ref()
1583                .map(|w| w.body_inspection.max_inspection_bytes as u64)
1584                .unwrap_or(1024 * 1024);
1585
1586            match ctx.request_body_streaming_mode {
1587                BodyStreamingMode::Stream => {
1588                    // Stream mode: send each chunk immediately
1589                    if body.is_some() {
1590                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1591                            .await?;
1592                    } else if end_of_stream && ctx.agent_needs_more {
1593                        // Send final empty chunk to signal end
1594                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1595                            .await?;
1596                    }
1597                }
1598                BodyStreamingMode::Hybrid { buffer_threshold } => {
1599                    // Hybrid mode: buffer up to threshold, then stream
1600                    if ctx.body_bytes_inspected < buffer_threshold as u64 {
1601                        // Still in buffering phase
1602                        if let Some(ref chunk) = body {
1603                            let bytes_to_buffer = std::cmp::min(
1604                                chunk.len(),
1605                                (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
1606                            );
1607                            ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
1608                            ctx.body_bytes_inspected += bytes_to_buffer as u64;
1609
1610                            // If we've reached threshold or end of stream, switch to streaming
1611                            if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
1612                            {
1613                                // Send buffered content first
1614                                self.send_buffered_body_to_agents(
1615                                    end_of_stream && chunk.len() == bytes_to_buffer,
1616                                    ctx,
1617                                )
1618                                .await?;
1619                                ctx.body_buffer.clear();
1620
1621                                // If there's remaining data in this chunk, stream it
1622                                if bytes_to_buffer < chunk.len() {
1623                                    let remaining = chunk.slice(bytes_to_buffer..);
1624                                    let mut remaining_body = Some(remaining);
1625                                    self.process_body_chunk_streaming(
1626                                        &mut remaining_body,
1627                                        end_of_stream,
1628                                        ctx,
1629                                    )
1630                                    .await?;
1631                                }
1632                            }
1633                        }
1634                    } else {
1635                        // Past threshold, stream directly
1636                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1637                            .await?;
1638                    }
1639                }
1640                BodyStreamingMode::Buffer => {
1641                    // Buffer mode: collect chunks until ready to send
1642                    if let Some(ref chunk) = body {
1643                        if ctx.body_bytes_inspected < max_inspection_bytes {
1644                            let bytes_to_inspect = std::cmp::min(
1645                                chunk.len() as u64,
1646                                max_inspection_bytes - ctx.body_bytes_inspected,
1647                            ) as usize;
1648
1649                            ctx.body_buffer
1650                                .extend_from_slice(&chunk[..bytes_to_inspect]);
1651                            ctx.body_bytes_inspected += bytes_to_inspect as u64;
1652
1653                            trace!(
1654                                correlation_id = %ctx.trace_id,
1655                                bytes_inspected = ctx.body_bytes_inspected,
1656                                max_inspection_bytes = max_inspection_bytes,
1657                                buffer_size = ctx.body_buffer.len(),
1658                                "Buffering body for agent inspection"
1659                            );
1660                        }
1661                    }
1662
1663                    // Send when complete or limit reached
1664                    let should_send =
1665                        end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1666                    if should_send && !ctx.body_buffer.is_empty() {
1667                        self.send_buffered_body_to_agents(end_of_stream, ctx)
1668                            .await?;
1669                        ctx.body_buffer.clear();
1670                    }
1671                }
1672            }
1673        }
1674
1675        if end_of_stream {
1676            trace!(
1677                correlation_id = %ctx.trace_id,
1678                total_body_bytes = ctx.request_body_bytes,
1679                bytes_inspected = ctx.body_bytes_inspected,
1680                "Request body complete"
1681            );
1682        }
1683
1684        Ok(())
1685    }
1686
1687    async fn response_filter(
1688        &self,
1689        session: &mut Session,
1690        upstream_response: &mut ResponseHeader,
1691        ctx: &mut Self::CTX,
1692    ) -> Result<(), Box<Error>> {
1693        let status = upstream_response.status.as_u16();
1694        let duration = ctx.elapsed();
1695
1696        trace!(
1697            correlation_id = %ctx.trace_id,
1698            status = status,
1699            "Starting response filter phase"
1700        );
1701
1702        // Handle WebSocket 101 Switching Protocols
1703        if status == 101 && ctx.is_websocket_upgrade {
1704            if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1705                // Create WebSocket inspector and handler with metrics
1706                let inspector = crate::websocket::WebSocketInspector::with_metrics(
1707                    self.agent_manager.clone(),
1708                    ctx.route_id
1709                        .clone()
1710                        .unwrap_or_else(|| "unknown".to_string()),
1711                    ctx.trace_id.clone(),
1712                    ctx.client_ip.clone(),
1713                    100, // 100ms timeout per frame inspection
1714                    Some(self.metrics.clone()),
1715                );
1716
1717                let handler = crate::websocket::WebSocketHandler::new(
1718                    std::sync::Arc::new(inspector),
1719                    1024 * 1024, // 1MB max frame size
1720                );
1721
1722                ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1723
1724                info!(
1725                    correlation_id = %ctx.trace_id,
1726                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1727                    agent_count = ctx.websocket_inspection_agents.len(),
1728                    "WebSocket upgrade successful, frame inspection enabled"
1729                );
1730            } else if ctx.websocket_skip_inspection {
1731                debug!(
1732                    correlation_id = %ctx.trace_id,
1733                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1734                    "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1735                );
1736            } else {
1737                debug!(
1738                    correlation_id = %ctx.trace_id,
1739                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1740                    "WebSocket upgrade successful"
1741                );
1742            }
1743        }
1744
1745        // Add correlation ID to response
1746        upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1747
1748        // Add rate limit headers if rate limiting was applied
1749        if let Some(ref rate_info) = ctx.rate_limit_info {
1750            upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1751            upstream_response
1752                .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1753            upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1754        }
1755
1756        // Add token budget headers if budget tracking was enabled
1757        if ctx.inference_budget_enabled {
1758            if let Some(remaining) = ctx.inference_budget_remaining {
1759                upstream_response.insert_header("X-Budget-Remaining", remaining.to_string())?;
1760            }
1761            if let Some(period_reset) = ctx.inference_budget_period_reset {
1762                // Format as ISO 8601 timestamp
1763                let reset_datetime = chrono::DateTime::from_timestamp(period_reset as i64, 0)
1764                    .map(|dt| dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1765                    .unwrap_or_else(|| period_reset.to_string());
1766                upstream_response.insert_header("X-Budget-Period-Reset", reset_datetime)?;
1767            }
1768        }
1769
1770        // Add GeoIP country header if geo lookup was performed
1771        if let Some(ref country_code) = ctx.geo_country_code {
1772            upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1773        }
1774
1775        // Apply route-specific response header modifications (policies)
1776        if let Some(ref route_config) = ctx.route_config {
1777            let mods = &route_config.policies.response_headers;
1778            // Rename runs before set/add/remove
1779            for (old_name, new_name) in &mods.rename {
1780                if let Some(value) = upstream_response
1781                    .headers
1782                    .get(old_name)
1783                    .and_then(|v| v.to_str().ok())
1784                {
1785                    let owned = value.to_string();
1786                    upstream_response
1787                        .insert_header(new_name.clone(), &owned)
1788                        .ok();
1789                    upstream_response.remove_header(old_name);
1790                }
1791            }
1792            for (name, value) in &mods.set {
1793                upstream_response
1794                    .insert_header(name.clone(), value.as_str())
1795                    .ok();
1796            }
1797            for (name, value) in &mods.add {
1798                upstream_response
1799                    .append_header(name.clone(), value.as_str())
1800                    .ok();
1801            }
1802            for name in &mods.remove {
1803                upstream_response.remove_header(name);
1804            }
1805        }
1806
1807        // Inject Cache-Status header (RFC 9211) if enabled
1808        if let Some(ref cache_status) = ctx.cache_status {
1809            let status_header_enabled = ctx
1810                .config
1811                .as_ref()
1812                .and_then(|c| c.cache.as_ref())
1813                .map(|c| c.status_header)
1814                .unwrap_or(false);
1815
1816            if status_header_enabled {
1817                let value = match cache_status {
1818                    super::context::CacheStatus::HitMemory => "grapsus; hit; detail=memory",
1819                    super::context::CacheStatus::HitDisk => "grapsus; hit; detail=disk",
1820                    super::context::CacheStatus::Hit => "grapsus; hit",
1821                    super::context::CacheStatus::HitStale => "grapsus; fwd=stale",
1822                    super::context::CacheStatus::Miss => "grapsus; fwd=miss",
1823                    super::context::CacheStatus::Bypass(reason) => {
1824                        // Leak a static string for the formatted bypass value
1825                        // This is fine since there are only a few fixed reason strings
1826                        match *reason {
1827                            "method" => "grapsus; fwd=bypass; detail=method",
1828                            "disabled" => "grapsus; fwd=bypass; detail=disabled",
1829                            "no-route" => "grapsus; fwd=bypass; detail=no-route",
1830                            _ => "grapsus; fwd=bypass",
1831                        }
1832                    }
1833                };
1834                upstream_response.insert_header("Cache-Status", value).ok();
1835            }
1836        }
1837
1838        // Apply response-phase route filters (Headers, CORS, Compress, Log)
1839        if let Some(config) = ctx.config.as_ref().map(std::sync::Arc::clone) {
1840            super::filters::apply_response_filters(upstream_response, ctx, &config);
1841        }
1842
1843        // Enable Pingora response compression if Compress filter marked it eligible
1844        if ctx.compress_enabled {
1845            session.upstream_compression.adjust_level(6);
1846        }
1847
1848        // Apply per-listener keepalive timeout
1849        if let Some(keepalive_secs) = ctx.listener_keepalive_timeout_secs {
1850            session
1851                .downstream_session
1852                .set_keepalive(Some(keepalive_secs));
1853        }
1854
1855        // Add sticky session cookie if a new assignment was made
1856        if ctx.sticky_session_new_assignment {
1857            if let Some(ref set_cookie_header) = ctx.sticky_session_set_cookie {
1858                upstream_response.insert_header("Set-Cookie", set_cookie_header)?;
1859                trace!(
1860                    correlation_id = %ctx.trace_id,
1861                    sticky_target_index = ?ctx.sticky_target_index,
1862                    "Added sticky session Set-Cookie header"
1863                );
1864            }
1865        }
1866
1867        // Add guardrail warning header if prompt injection was detected (warn mode)
1868        if ctx.guardrail_warning {
1869            upstream_response.insert_header("X-Guardrail-Warning", "prompt-injection-detected")?;
1870        }
1871
1872        // Add fallback routing headers if fallback was used
1873        if ctx.used_fallback() {
1874            upstream_response.insert_header("X-Fallback-Used", "true")?;
1875
1876            if let Some(ref upstream) = ctx.upstream {
1877                upstream_response.insert_header("X-Fallback-Upstream", upstream)?;
1878            }
1879
1880            if let Some(ref reason) = ctx.fallback_reason {
1881                upstream_response.insert_header("X-Fallback-Reason", reason.to_string())?;
1882            }
1883
1884            if let Some(ref original) = ctx.original_upstream {
1885                upstream_response.insert_header("X-Original-Upstream", original)?;
1886            }
1887
1888            if let Some(ref mapping) = ctx.model_mapping_applied {
1889                upstream_response
1890                    .insert_header("X-Model-Mapping", format!("{} -> {}", mapping.0, mapping.1))?;
1891            }
1892
1893            trace!(
1894                correlation_id = %ctx.trace_id,
1895                fallback_attempt = ctx.fallback_attempt,
1896                fallback_upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1897                original_upstream = ctx.original_upstream.as_deref().unwrap_or("unknown"),
1898                "Added fallback response headers"
1899            );
1900
1901            // Record fallback success metrics for successful responses (2xx/3xx)
1902            if status < 400 {
1903                if let Some(metrics) = get_fallback_metrics() {
1904                    metrics.record_fallback_success(
1905                        ctx.route_id.as_deref().unwrap_or("unknown"),
1906                        ctx.upstream.as_deref().unwrap_or("unknown"),
1907                    );
1908                }
1909            }
1910        }
1911
1912        // Initialize streaming token counter for SSE responses on inference routes
1913        if ctx.inference_rate_limit_enabled {
1914            // Check if this is an SSE response
1915            let content_type = upstream_response
1916                .headers
1917                .get("content-type")
1918                .and_then(|ct| ct.to_str().ok());
1919
1920            if is_sse_response(content_type) {
1921                // Get provider from route config
1922                let provider = ctx
1923                    .route_config
1924                    .as_ref()
1925                    .and_then(|r| r.inference.as_ref())
1926                    .map(|i| i.provider)
1927                    .unwrap_or_default();
1928
1929                ctx.inference_streaming_response = true;
1930                ctx.inference_streaming_counter = Some(StreamingTokenCounter::new(
1931                    provider,
1932                    ctx.inference_model.clone(),
1933                ));
1934
1935                trace!(
1936                    correlation_id = %ctx.trace_id,
1937                    content_type = ?content_type,
1938                    model = ?ctx.inference_model,
1939                    "Initialized streaming token counter for SSE response"
1940                );
1941            }
1942        }
1943
1944        // Process response headers through agents (for agents that subscribe to ResponseHeaders events)
1945        if !ctx.route_agent_ids.is_empty() {
1946            let agent_ids = ctx.route_agent_ids.clone();
1947            let mut resp_headers_map: std::collections::HashMap<String, Vec<String>> =
1948                std::collections::HashMap::with_capacity(upstream_response.headers.len());
1949            for (name, value) in upstream_response.headers.iter() {
1950                resp_headers_map
1951                    .entry(name.as_str().to_string())
1952                    .or_default()
1953                    .push(value.to_str().unwrap_or("").to_string());
1954            }
1955
1956            let agent_ctx = crate::agents::AgentCallContext {
1957                correlation_id: grapsus_common::CorrelationId::from_string(&ctx.trace_id),
1958                metadata: grapsus_agent_protocol::RequestMetadata {
1959                    correlation_id: ctx.trace_id.clone(),
1960                    request_id: uuid::Uuid::new_v4().to_string(),
1961                    client_ip: ctx.client_ip.clone(),
1962                    client_port: 0,
1963                    server_name: ctx.host.clone(),
1964                    protocol: "HTTP/1.1".to_string(),
1965                    tls_version: None,
1966                    tls_cipher: None,
1967                    route_id: ctx.route_id.clone(),
1968                    upstream_id: ctx.upstream.clone(),
1969                    timestamp: chrono::Utc::now().to_rfc3339(),
1970                    traceparent: ctx.traceparent(),
1971                },
1972                route_id: ctx.route_id.clone(),
1973                upstream_id: ctx.upstream.clone(),
1974                request_body: None,
1975                response_body: None,
1976            };
1977
1978            match self
1979                .agent_manager
1980                .process_response_headers(&agent_ctx, status, &resp_headers_map, &agent_ids)
1981                .await
1982            {
1983                Ok(decision) => {
1984                    // Apply response header modifications from agent
1985                    for op in &decision.response_headers {
1986                        match op {
1987                            grapsus_agent_protocol::HeaderOp::Set { name, value } => {
1988                                upstream_response
1989                                    .insert_header(name.clone(), value.as_str())
1990                                    .ok();
1991                            }
1992                            grapsus_agent_protocol::HeaderOp::Add { name, value } => {
1993                                upstream_response
1994                                    .append_header(name.clone(), value.as_str())
1995                                    .ok();
1996                            }
1997                            grapsus_agent_protocol::HeaderOp::Remove { name } => {
1998                                upstream_response.remove_header(name);
1999                            }
2000                        }
2001                    }
2002
2003                    // Check if any agent subscribes to response body events
2004                    let has_body_agents = self
2005                        .agent_manager
2006                        .any_agent_handles_event(
2007                            &agent_ids,
2008                            grapsus_agent_protocol::EventType::ResponseBodyChunk,
2009                        )
2010                        .await;
2011                    if has_body_agents {
2012                        ctx.response_agent_processing_enabled = true;
2013                        // Since agent may replace the body, Content-Length is invalid.
2014                        // Use Connection: close to signal end-of-body to the client.
2015                        upstream_response.insert_header("Connection", "close").ok();
2016                        session.downstream_session.set_keepalive(None);
2017                        debug!(
2018                            correlation_id = %ctx.trace_id,
2019                            "Enabling response body agent processing (agent subscribes to ResponseBody)"
2020                        );
2021                    }
2022
2023                    debug!(
2024                        correlation_id = %ctx.trace_id,
2025                        response_headers_modified = !decision.response_headers.is_empty(),
2026                        needs_body = ctx.response_agent_processing_enabled,
2027                        "Response headers processed through agents"
2028                    );
2029                }
2030                Err(e) => {
2031                    warn!(
2032                        correlation_id = %ctx.trace_id,
2033                        error = %e,
2034                        "Agent response header processing failed, continuing without agent"
2035                    );
2036                }
2037            }
2038        }
2039
2040        // Generate custom error pages for error responses
2041        if status >= 400 {
2042            trace!(
2043                correlation_id = %ctx.trace_id,
2044                status = status,
2045                "Handling error response"
2046            );
2047            self.handle_error_response(upstream_response, ctx).await?;
2048        }
2049
2050        // Record metrics
2051        self.metrics.record_request(
2052            ctx.route_id.as_deref().unwrap_or("unknown"),
2053            &ctx.method,
2054            status,
2055            duration,
2056        );
2057
2058        // Record OpenTelemetry span status
2059        if let Some(ref mut span) = ctx.otel_span {
2060            span.set_status(status);
2061            if let Some(ref upstream) = ctx.upstream {
2062                span.set_upstream(upstream, "");
2063            }
2064            if status >= 500 {
2065                span.record_error(&format!("HTTP {}", status));
2066            }
2067        }
2068
2069        // Record passive health check
2070        if let Some(ref upstream) = ctx.upstream {
2071            let success = status < 500;
2072
2073            trace!(
2074                correlation_id = %ctx.trace_id,
2075                upstream = %upstream,
2076                success = success,
2077                status = status,
2078                "Recording passive health check result"
2079            );
2080
2081            let error_msg = if !success {
2082                Some(format!("HTTP {}", status))
2083            } else {
2084                None
2085            };
2086            self.passive_health
2087                .record_outcome(upstream, success, error_msg.as_deref())
2088                .await;
2089
2090            // Report to upstream pool
2091            if let Some(pool) = self.upstream_pools.get(upstream).await {
2092                pool.report_result(upstream, success).await;
2093            }
2094
2095            if !success {
2096                warn!(
2097                    correlation_id = %ctx.trace_id,
2098                    upstream = %upstream,
2099                    status = status,
2100                    "Upstream returned error status"
2101                );
2102            }
2103        }
2104
2105        // Final request completion log
2106        if status >= 500 {
2107            error!(
2108                correlation_id = %ctx.trace_id,
2109                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2110                upstream = ctx.upstream.as_deref().unwrap_or("none"),
2111                method = %ctx.method,
2112                path = %ctx.path,
2113                status = status,
2114                duration_ms = duration.as_millis(),
2115                attempts = ctx.upstream_attempts,
2116                "Request completed with server error"
2117            );
2118            self.log_manager.log_request_error(
2119                "error",
2120                "Request completed with server error",
2121                &ctx.trace_id,
2122                ctx.route_id.as_deref(),
2123                ctx.upstream.as_deref(),
2124                Some(format!(
2125                    "status={} method={} path={} duration_ms={}",
2126                    status,
2127                    ctx.method,
2128                    ctx.path,
2129                    duration.as_millis()
2130                )),
2131            );
2132        } else if status >= 400 {
2133            warn!(
2134                correlation_id = %ctx.trace_id,
2135                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2136                upstream = ctx.upstream.as_deref().unwrap_or("none"),
2137                method = %ctx.method,
2138                path = %ctx.path,
2139                status = status,
2140                duration_ms = duration.as_millis(),
2141                "Request completed with client error"
2142            );
2143            self.log_manager.log_request_error(
2144                "warn",
2145                "Request completed with client error",
2146                &ctx.trace_id,
2147                ctx.route_id.as_deref(),
2148                ctx.upstream.as_deref(),
2149                Some(format!(
2150                    "status={} method={} path={}",
2151                    status, ctx.method, ctx.path
2152                )),
2153            );
2154        } else {
2155            debug!(
2156                correlation_id = %ctx.trace_id,
2157                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2158                upstream = ctx.upstream.as_deref().unwrap_or("none"),
2159                method = %ctx.method,
2160                path = %ctx.path,
2161                status = status,
2162                duration_ms = duration.as_millis(),
2163                attempts = ctx.upstream_attempts,
2164                "Request completed"
2165            );
2166        }
2167
2168        Ok(())
2169    }
2170
2171    /// Modify the request before sending to upstream.
2172    /// Used for header modifications, adding authentication, etc.
2173    async fn upstream_request_filter(
2174        &self,
2175        _session: &mut Session,
2176        upstream_request: &mut pingora::http::RequestHeader,
2177        ctx: &mut Self::CTX,
2178    ) -> Result<()>
2179    where
2180        Self::CTX: Send + Sync,
2181    {
2182        trace!(
2183            correlation_id = %ctx.trace_id,
2184            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2185            "Applying upstream request modifications"
2186        );
2187
2188        // Add trace ID header for upstream correlation
2189        upstream_request
2190            .insert_header("X-Trace-Id", &ctx.trace_id)
2191            .ok();
2192
2193        // Add W3C traceparent header for distributed tracing
2194        if let Some(ref span) = ctx.otel_span {
2195            let sampled = ctx
2196                .trace_context
2197                .as_ref()
2198                .map(|c| c.sampled)
2199                .unwrap_or(true);
2200            let traceparent =
2201                crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled);
2202            upstream_request
2203                .insert_header(crate::otel::TRACEPARENT_HEADER, &traceparent)
2204                .ok();
2205        }
2206
2207        // Add request metadata headers
2208        upstream_request
2209            .insert_header("X-Forwarded-By", "Grapsus")
2210            .ok();
2211
2212        // Apply route-specific request header modifications
2213        // Note: Pingora's IntoCaseHeaderName requires owned String for header names,
2214        // so we clone names but pass values by reference to avoid cloning both.
2215        if let Some(ref route_config) = ctx.route_config {
2216            let mods = &route_config.policies.request_headers;
2217
2218            // Rename runs before set/add/remove
2219            for (old_name, new_name) in &mods.rename {
2220                if let Some(value) = upstream_request
2221                    .headers
2222                    .get(old_name)
2223                    .and_then(|v| v.to_str().ok())
2224                {
2225                    let owned = value.to_string();
2226                    upstream_request
2227                        .insert_header(new_name.clone(), &owned)
2228                        .ok();
2229                    upstream_request.remove_header(old_name);
2230                }
2231            }
2232
2233            // Set headers (overwrite existing)
2234            for (name, value) in &mods.set {
2235                upstream_request
2236                    .insert_header(name.clone(), value.as_str())
2237                    .ok();
2238            }
2239
2240            // Add headers (append)
2241            for (name, value) in &mods.add {
2242                upstream_request
2243                    .append_header(name.clone(), value.as_str())
2244                    .ok();
2245            }
2246
2247            // Remove headers
2248            for name in &mods.remove {
2249                upstream_request.remove_header(name);
2250            }
2251
2252            trace!(
2253                correlation_id = %ctx.trace_id,
2254                "Applied request header modifications"
2255            );
2256        }
2257
2258        // Apply request-phase Headers filters
2259        if let Some(ref config) = ctx.config {
2260            super::filters::apply_request_headers_filters(upstream_request, ctx, config);
2261        }
2262
2263        // Remove sensitive headers that shouldn't go to upstream
2264        upstream_request.remove_header("X-Internal-Token");
2265        upstream_request.remove_header("Authorization-Internal");
2266
2267        // === Traffic Mirroring / Shadowing ===
2268        // Check if this route has shadow configuration
2269        if let Some(ref route_config) = ctx.route_config {
2270            if let Some(ref shadow_config) = route_config.shadow {
2271                // Get snapshot of upstream pools for shadow manager
2272                let pools_snapshot = self.upstream_pools.snapshot().await;
2273                let upstream_pools = std::sync::Arc::new(pools_snapshot);
2274
2275                // Get route ID for metrics labeling
2276                let route_id = ctx
2277                    .route_id
2278                    .clone()
2279                    .unwrap_or_else(|| "unknown".to_string());
2280
2281                // Create shadow manager
2282                let shadow_manager = crate::shadow::ShadowManager::new(
2283                    upstream_pools,
2284                    shadow_config.clone(),
2285                    Some(std::sync::Arc::clone(&self.metrics)),
2286                    route_id,
2287                );
2288
2289                // Check if we should shadow this request (sampling + header check)
2290                if shadow_manager.should_shadow(upstream_request) {
2291                    trace!(
2292                        correlation_id = %ctx.trace_id,
2293                        shadow_upstream = %shadow_config.upstream,
2294                        percentage = shadow_config.percentage,
2295                        "Shadowing request"
2296                    );
2297
2298                    // Clone headers for shadow request
2299                    let shadow_headers = upstream_request.clone();
2300
2301                    // Create request context for shadow (simplified from proxy context)
2302                    let shadow_ctx = crate::upstream::RequestContext {
2303                        client_ip: ctx.client_ip.parse().ok(),
2304                        headers: std::collections::HashMap::new(), // Empty for now
2305                        path: ctx.path.clone(),
2306                        method: ctx.method.clone(),
2307                    };
2308
2309                    // Determine if we should buffer the body
2310                    let buffer_body = shadow_config.buffer_body
2311                        && crate::shadow::should_buffer_method(&ctx.method);
2312
2313                    if buffer_body {
2314                        // Body buffering requested - defer shadow request until body is available
2315                        // Store shadow info in context; will be fired in logging phase
2316                        // or when request body filter completes
2317                        trace!(
2318                            correlation_id = %ctx.trace_id,
2319                            "Deferring shadow request until body is buffered"
2320                        );
2321                        ctx.shadow_pending = Some(crate::proxy::context::ShadowPendingRequest {
2322                            headers: shadow_headers,
2323                            manager: std::sync::Arc::new(shadow_manager),
2324                            request_ctx: shadow_ctx,
2325                            include_body: true,
2326                        });
2327                        // Enable body inspection to capture the body for shadow
2328                        // (only if not already enabled for other reasons)
2329                        if !ctx.body_inspection_enabled {
2330                            ctx.body_inspection_enabled = true;
2331                            // Set a reasonable buffer limit from shadow config
2332                            // (body_buffer will accumulate chunks)
2333                        }
2334                    } else {
2335                        // No body buffering needed - fire shadow request immediately
2336                        shadow_manager.shadow_request(shadow_headers, None, shadow_ctx);
2337                        ctx.shadow_sent = true;
2338                    }
2339                }
2340            }
2341        }
2342
2343        Ok(())
2344    }
2345
2346    /// Process response body chunks from upstream.
2347    /// Used for response size tracking and WAF inspection.
2348    ///
2349    /// Note: Response body inspection is currently buffered only (streaming mode not supported
2350    /// for responses due to Pingora's synchronous filter design).
2351    fn response_body_filter(
2352        &self,
2353        _session: &mut Session,
2354        body: &mut Option<Bytes>,
2355        end_of_stream: bool,
2356        ctx: &mut Self::CTX,
2357    ) -> Result<Option<Duration>, Box<Error>> {
2358        // Handle WebSocket frame inspection (server -> client)
2359        // Note: This filter is synchronous, so we use block_in_place for async agent calls
2360        if ctx.is_websocket_upgrade {
2361            if let Some(ref handler) = ctx.websocket_handler {
2362                let handler = handler.clone();
2363                let data = body.take();
2364
2365                // Use block_in_place to run async handler from sync context
2366                // This is safe because Pingora uses a multi-threaded tokio runtime
2367                let result = tokio::task::block_in_place(|| {
2368                    tokio::runtime::Handle::current()
2369                        .block_on(async { handler.process_server_data(data).await })
2370                });
2371
2372                match result {
2373                    crate::websocket::ProcessResult::Forward(data) => {
2374                        *body = data;
2375                    }
2376                    crate::websocket::ProcessResult::Close(reason) => {
2377                        warn!(
2378                            correlation_id = %ctx.trace_id,
2379                            code = reason.code,
2380                            reason = %reason.reason,
2381                            "WebSocket connection closed by agent (server->client)"
2382                        );
2383                        // For sync filter, we can't return an error that closes the connection
2384                        // Instead, inject a close frame
2385                        let close_frame =
2386                            crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
2387                        let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
2388                        if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
2389                            *body = Some(Bytes::from(encoded));
2390                        }
2391                    }
2392                }
2393            }
2394            // Skip normal body processing for WebSocket
2395            return Ok(None);
2396        }
2397
2398        // Process response body through agents (for agents that subscribe to ResponseBody events)
2399        if ctx.response_agent_processing_enabled && !ctx.route_agent_ids.is_empty() {
2400            if let Some(ref chunk) = body {
2401                ctx.response_agent_body_buffer.extend_from_slice(chunk);
2402            }
2403
2404            if end_of_stream {
2405                let agent_ids = ctx.route_agent_ids.clone();
2406                let buffer = std::mem::take(&mut ctx.response_agent_body_buffer);
2407                let chunk_index = 0u32;
2408                let total_size = Some(buffer.len());
2409                let trace_id = ctx.trace_id.clone();
2410                let client_ip = ctx.client_ip.clone();
2411                let host = ctx.host.clone();
2412                let route_id = ctx.route_id.clone();
2413                let upstream_id = ctx.upstream.clone();
2414                let traceparent = ctx.traceparent();
2415                let agent_mgr = self.agent_manager.clone();
2416
2417                // Use block_in_place to run async agent call from sync context
2418                // This is safe because Pingora uses a multi-threaded tokio runtime
2419                let result = tokio::task::block_in_place(|| {
2420                    tokio::runtime::Handle::current().block_on(async {
2421                        let agent_ctx = crate::agents::AgentCallContext {
2422                            correlation_id: grapsus_common::CorrelationId::from_string(&trace_id),
2423                            metadata: grapsus_agent_protocol::RequestMetadata {
2424                                correlation_id: trace_id.clone(),
2425                                request_id: uuid::Uuid::new_v4().to_string(),
2426                                client_ip,
2427                                client_port: 0,
2428                                server_name: host,
2429                                protocol: "HTTP/1.1".to_string(),
2430                                tls_version: None,
2431                                tls_cipher: None,
2432                                route_id: route_id.clone(),
2433                                upstream_id: upstream_id.clone(),
2434                                timestamp: chrono::Utc::now().to_rfc3339(),
2435                                traceparent,
2436                            },
2437                            route_id,
2438                            upstream_id,
2439                            request_body: None,
2440                            response_body: None,
2441                        };
2442
2443                        agent_mgr
2444                            .process_response_body_streaming(
2445                                &agent_ctx,
2446                                &buffer,
2447                                true, // is_last
2448                                chunk_index,
2449                                buffer.len(),
2450                                total_size,
2451                                &agent_ids,
2452                            )
2453                            .await
2454                    })
2455                });
2456
2457                match result {
2458                    Ok(decision) => {
2459                        // Apply response body mutation if present
2460                        if let Some(mutation) = decision.response_body_mutation {
2461                            if let Some(ref data) = mutation.data {
2462                                if !data.is_empty() {
2463                                    // Decode base64 body from agent
2464                                    if let Ok(decoded) = base64::Engine::decode(
2465                                        &base64::engine::general_purpose::STANDARD,
2466                                        data,
2467                                    ) {
2468                                        debug!(
2469                                            correlation_id = %ctx.trace_id,
2470                                            original_size = buffer.len(),
2471                                            new_size = decoded.len(),
2472                                            "Agent replaced response body"
2473                                        );
2474                                        *body = Some(Bytes::from(decoded));
2475                                        ctx.response_agent_body_complete = true;
2476                                    } else {
2477                                        warn!(
2478                                            correlation_id = %ctx.trace_id,
2479                                            "Failed to decode agent response body mutation (invalid base64)"
2480                                        );
2481                                    }
2482                                }
2483                                // Empty data means drop the chunk — leave body as-is
2484                            }
2485                            // None data means pass through unchanged
2486                        }
2487
2488                        // Apply any additional response header modifications
2489                        // Note: Cannot modify response headers here (sync context, headers already sent)
2490                        // Header mods should be done in response_filter via ResponseHeaders event
2491                    }
2492                    Err(e) => {
2493                        warn!(
2494                            correlation_id = %ctx.trace_id,
2495                            error = %e,
2496                            "Agent response body processing failed, passing through original"
2497                        );
2498                    }
2499                }
2500            } else if !end_of_stream {
2501                // Buffer chunks — suppress output until we have the full body
2502                *body = None;
2503                return Ok(None);
2504            }
2505        }
2506
2507        // Track response body size
2508        if let Some(ref chunk) = body {
2509            ctx.response_bytes += chunk.len() as u64;
2510
2511            trace!(
2512                correlation_id = %ctx.trace_id,
2513                chunk_size = chunk.len(),
2514                total_response_bytes = ctx.response_bytes,
2515                end_of_stream = end_of_stream,
2516                "Processing response body chunk"
2517            );
2518
2519            // Process SSE chunks for streaming token counting
2520            if let Some(ref mut counter) = ctx.inference_streaming_counter {
2521                let result = counter.process_chunk(chunk);
2522
2523                if result.content.is_some() || result.is_done {
2524                    trace!(
2525                        correlation_id = %ctx.trace_id,
2526                        has_content = result.content.is_some(),
2527                        is_done = result.is_done,
2528                        chunks_processed = counter.chunks_processed(),
2529                        accumulated_content_len = counter.content().len(),
2530                        "Processed SSE chunk for token counting"
2531                    );
2532                }
2533            }
2534
2535            // Response body inspection (buffered mode only)
2536            // Note: Streaming mode for response bodies is not currently supported
2537            // due to Pingora's synchronous response_body_filter design
2538            if ctx.response_body_inspection_enabled
2539                && !ctx.response_body_inspection_agents.is_empty()
2540            {
2541                let config = ctx
2542                    .config
2543                    .get_or_insert_with(|| self.config_manager.current());
2544                let max_inspection_bytes = config
2545                    .waf
2546                    .as_ref()
2547                    .map(|w| w.body_inspection.max_inspection_bytes as u64)
2548                    .unwrap_or(1024 * 1024);
2549
2550                if ctx.response_body_bytes_inspected < max_inspection_bytes {
2551                    let bytes_to_inspect = std::cmp::min(
2552                        chunk.len() as u64,
2553                        max_inspection_bytes - ctx.response_body_bytes_inspected,
2554                    ) as usize;
2555
2556                    // Buffer for later processing (during logging phase)
2557                    // Response body inspection happens asynchronously and results
2558                    // are logged rather than blocking the response
2559                    ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
2560                    ctx.response_body_chunk_index += 1;
2561
2562                    trace!(
2563                        correlation_id = %ctx.trace_id,
2564                        bytes_inspected = ctx.response_body_bytes_inspected,
2565                        max_inspection_bytes = max_inspection_bytes,
2566                        chunk_index = ctx.response_body_chunk_index,
2567                        "Tracking response body for inspection"
2568                    );
2569                }
2570            }
2571        }
2572
2573        if end_of_stream {
2574            trace!(
2575                correlation_id = %ctx.trace_id,
2576                total_response_bytes = ctx.response_bytes,
2577                response_bytes_inspected = ctx.response_body_bytes_inspected,
2578                "Response body complete"
2579            );
2580        }
2581
2582        // Return None to indicate no delay needed
2583        Ok(None)
2584    }
2585
2586    /// Called when a connection to upstream is established or reused.
2587    /// Logs connection reuse statistics for observability.
2588    async fn connected_to_upstream(
2589        &self,
2590        _session: &mut Session,
2591        reused: bool,
2592        peer: &HttpPeer,
2593        #[cfg(unix)] _fd: RawFd,
2594        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
2595        digest: Option<&Digest>,
2596        ctx: &mut Self::CTX,
2597    ) -> Result<(), Box<Error>> {
2598        // Track connection reuse for metrics
2599        ctx.connection_reused = reused;
2600
2601        // Log connection establishment/reuse
2602        if reused {
2603            trace!(
2604                correlation_id = %ctx.trace_id,
2605                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2606                peer_address = %peer.address(),
2607                "Reusing existing upstream connection"
2608            );
2609        } else {
2610            debug!(
2611                correlation_id = %ctx.trace_id,
2612                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2613                peer_address = %peer.address(),
2614                ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
2615                "Established new upstream connection"
2616            );
2617        }
2618
2619        Ok(())
2620    }
2621
2622    // =========================================================================
2623    // HTTP Caching - Pingora Cache Integration
2624    // =========================================================================
2625
2626    /// Decide if the request should use caching.
2627    ///
2628    /// This method is called early in the request lifecycle to determine if
2629    /// the response should be served from cache or if the response should
2630    /// be cached.
2631    fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
2632        // Check if route has caching enabled
2633        let route_id = match ctx.route_id.as_deref() {
2634            Some(id) => id,
2635            None => {
2636                trace!(
2637                    correlation_id = %ctx.trace_id,
2638                    "Cache filter: no route ID, skipping cache"
2639                );
2640                return Ok(());
2641            }
2642        };
2643
2644        // Check if caching is enabled for this route
2645        if !self.cache_manager.is_enabled(route_id) {
2646            ctx.cache_status = Some(super::context::CacheStatus::Bypass("disabled"));
2647            trace!(
2648                correlation_id = %ctx.trace_id,
2649                route_id = %route_id,
2650                "Cache disabled for route"
2651            );
2652            return Ok(());
2653        }
2654
2655        // Check if method is cacheable (typically GET/HEAD)
2656        if !self
2657            .cache_manager
2658            .is_method_cacheable(route_id, &ctx.method)
2659        {
2660            ctx.cache_status = Some(super::context::CacheStatus::Bypass("method"));
2661            trace!(
2662                correlation_id = %ctx.trace_id,
2663                route_id = %route_id,
2664                method = %ctx.method,
2665                "Method not cacheable"
2666            );
2667            return Ok(());
2668        }
2669
2670        // Check if path is excluded from caching (by extension or pattern)
2671        if !self.cache_manager.is_path_cacheable(route_id, &ctx.path) {
2672            ctx.cache_status = Some(super::context::CacheStatus::Bypass("excluded"));
2673            trace!(
2674                correlation_id = %ctx.trace_id,
2675                route_id = %route_id,
2676                path = %ctx.path,
2677                "Path excluded from caching"
2678            );
2679            return Ok(());
2680        }
2681
2682        // Enable caching for this request using Pingora's cache infrastructure
2683        debug!(
2684            correlation_id = %ctx.trace_id,
2685            route_id = %route_id,
2686            method = %ctx.method,
2687            path = %ctx.path,
2688            "Enabling HTTP caching for request"
2689        );
2690
2691        // Get static references to cache infrastructure
2692        let storage = get_cache_storage();
2693        let eviction = get_cache_eviction();
2694        let cache_lock = get_cache_lock();
2695
2696        // Enable the cache with storage, eviction, and lock
2697        session.cache.enable(
2698            storage,
2699            Some(eviction),
2700            None, // predictor - optional
2701            Some(cache_lock),
2702            None, // option overrides
2703        );
2704
2705        // Mark request as cache-eligible in context
2706        ctx.cache_eligible = true;
2707
2708        trace!(
2709            correlation_id = %ctx.trace_id,
2710            route_id = %route_id,
2711            cache_enabled = session.cache.enabled(),
2712            "Cache enabled for request"
2713        );
2714
2715        Ok(())
2716    }
2717
2718    /// Generate the cache key for this request.
2719    ///
2720    /// The cache key uniquely identifies the cached response. It typically
2721    /// includes the method, host, path, and potentially query parameters.
2722    fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
2723        let req_header = session.req_header();
2724        let method = req_header.method.as_str();
2725        let path = req_header.uri.path();
2726        let host = ctx.host.as_deref().unwrap_or("unknown");
2727        let query = req_header.uri.query();
2728
2729        // Generate cache key using our cache manager
2730        let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2731
2732        trace!(
2733            correlation_id = %ctx.trace_id,
2734            cache_key = %key_string,
2735            "Generated cache key"
2736        );
2737
2738        // Use Pingora's default cache key generator which handles
2739        // proper hashing and internal format
2740        let host = req_header.uri.authority().map(|a| a.as_str()).unwrap_or("default"); let path_and_query = req_header.uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("/"); Ok(CacheKey::new(host, path_and_query, ""))
2741    }
2742
2743    /// Called when a cache miss occurs.
2744    ///
2745    /// This is called when the cache lookup found no matching entry.
2746    /// We can use this to log and track cache misses.
2747    fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
2748        // Let Pingora handle the cache miss
2749        session.cache.cache_miss();
2750
2751        ctx.cache_status = Some(super::context::CacheStatus::Miss);
2752
2753        // Track statistics
2754        if let Some(route_id) = ctx.route_id.as_deref() {
2755            self.cache_manager.stats().record_miss();
2756
2757            trace!(
2758                correlation_id = %ctx.trace_id,
2759                route_id = %route_id,
2760                path = %ctx.path,
2761                "Cache miss"
2762            );
2763        }
2764    }
2765
2766    /// Called after a successful cache lookup.
2767    ///
2768    /// This filter allows inspecting the cached response before serving it.
2769    /// Returns `None` to serve the cached response, or a `ForcedFreshness`
2770    /// to override the freshness decision.
2771    async fn cache_hit_filter(
2772        &self,
2773        session: &mut Session,
2774        meta: &CacheMeta,
2775        hit_handler: &mut HitHandler,
2776        is_fresh: bool,
2777        ctx: &mut Self::CTX,
2778    ) -> Result<Option<ForcedFreshness>>
2779    where
2780        Self::CTX: Send + Sync,
2781    {
2782        // Check if this cache entry should be invalidated due to a purge request
2783        let req_header = session.req_header();
2784        let method = req_header.method.as_str();
2785        let path = req_header.uri.path();
2786        let host = req_header.uri.host().unwrap_or("localhost");
2787        let query = req_header.uri.query();
2788
2789        // Generate the cache key for this request
2790        let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2791
2792        // Check if this key should be invalidated
2793        if self.cache_manager.should_invalidate(&cache_key) {
2794            info!(
2795                correlation_id = %ctx.trace_id,
2796                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2797                cache_key = %cache_key,
2798                "Cache entry invalidated by purge request"
2799            );
2800            // Force expiration so the entry is refetched from upstream
2801            return Ok(Some(ForcedFreshness::ForceExpired));
2802        }
2803
2804        // Track cache hit statistics
2805        if is_fresh {
2806            // Detect which tier served the hit via downcast
2807            let is_disk_hit = hit_handler
2808                .as_any()
2809                .downcast_ref::<HybridHitHandler>()
2810                .is_some()
2811                || hit_handler
2812                    .as_any()
2813                    .downcast_ref::<DiskHitHandler>()
2814                    .is_some();
2815
2816            let stats = self.cache_manager.stats();
2817            if is_disk_hit {
2818                ctx.cache_status = Some(super::context::CacheStatus::HitDisk);
2819                stats.record_disk_hit();
2820            } else {
2821                ctx.cache_status = Some(super::context::CacheStatus::HitMemory);
2822                stats.record_memory_hit();
2823            }
2824            stats.record_hit();
2825
2826            debug!(
2827                correlation_id = %ctx.trace_id,
2828                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2829                is_fresh = is_fresh,
2830                tier = if is_disk_hit { "disk" } else { "memory" },
2831                "Cache hit (fresh)"
2832            );
2833        } else {
2834            ctx.cache_status = Some(super::context::CacheStatus::HitStale);
2835
2836            trace!(
2837                correlation_id = %ctx.trace_id,
2838                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2839                is_fresh = is_fresh,
2840                "Cache hit (stale)"
2841            );
2842        }
2843
2844        // Serve the cached response without invalidation
2845        Ok(None)
2846    }
2847
2848    /// Decide if the response should be cached.
2849    ///
2850    /// Called after receiving the response from upstream to determine
2851    /// if it should be stored in the cache.
2852    fn response_cache_filter(
2853        &self,
2854        _session: &Session,
2855        resp: &ResponseHeader,
2856        ctx: &mut Self::CTX,
2857    ) -> Result<RespCacheable> {
2858        let route_id = match ctx.route_id.as_deref() {
2859            Some(id) => id,
2860            None => {
2861                return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2862                    "no_route",
2863                )));
2864            }
2865        };
2866
2867        // Check if caching is enabled for this route
2868        if !self.cache_manager.is_enabled(route_id) {
2869            return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2870                "disabled",
2871            )));
2872        }
2873
2874        let status = resp.status.as_u16();
2875
2876        // Check if status code is cacheable
2877        if !self.cache_manager.is_status_cacheable(route_id, status) {
2878            trace!(
2879                correlation_id = %ctx.trace_id,
2880                route_id = %route_id,
2881                status = status,
2882                "Status code not cacheable"
2883            );
2884            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2885        }
2886
2887        // Check Cache-Control header for no-store, no-cache, private
2888        if let Some(cache_control) = resp.headers.get("cache-control") {
2889            if let Ok(cc_str) = cache_control.to_str() {
2890                if crate::cache::CacheManager::is_no_cache(cc_str) {
2891                    trace!(
2892                        correlation_id = %ctx.trace_id,
2893                        route_id = %route_id,
2894                        cache_control = %cc_str,
2895                        "Response has no-cache directive"
2896                    );
2897                    return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2898                }
2899            }
2900        }
2901
2902        // Calculate TTL from Cache-Control or use default
2903        let cache_control = resp
2904            .headers
2905            .get("cache-control")
2906            .and_then(|v| v.to_str().ok());
2907        let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
2908
2909        if ttl.is_zero() {
2910            trace!(
2911                correlation_id = %ctx.trace_id,
2912                route_id = %route_id,
2913                "TTL is zero, not caching"
2914            );
2915            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2916        }
2917
2918        // Get route cache config for stale settings
2919        let config = self
2920            .cache_manager
2921            .get_route_config(route_id)
2922            .unwrap_or_default();
2923
2924        // Create timestamps for cache metadata
2925        let now = std::time::SystemTime::now();
2926        let fresh_until = now + ttl;
2927
2928        // Clone the response header for storage
2929        let header = resp.clone();
2930
2931        // Create CacheMeta with proper timestamps and TTLs
2932        let cache_meta = CacheMeta::new(
2933            fresh_until,
2934            now,
2935            config.stale_while_revalidate_secs as u32,
2936            config.stale_if_error_secs as u32,
2937            header,
2938        );
2939
2940        // Track the cache store
2941        self.cache_manager.stats().record_store();
2942
2943        debug!(
2944            correlation_id = %ctx.trace_id,
2945            route_id = %route_id,
2946            status = status,
2947            ttl_secs = ttl.as_secs(),
2948            stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2949            stale_if_error_secs = config.stale_if_error_secs,
2950            "Caching response"
2951        );
2952
2953        Ok(RespCacheable::Cacheable(cache_meta))
2954    }
2955
2956    /// Decide whether to serve stale content on error or during revalidation.
2957    ///
2958    /// This implements stale-while-revalidate and stale-if-error semantics.
2959    fn should_serve_stale(
2960        &self,
2961        _session: &mut Session,
2962        ctx: &mut Self::CTX,
2963        error: Option<&Error>,
2964    ) -> bool {
2965        let route_id = match ctx.route_id.as_deref() {
2966            Some(id) => id,
2967            None => return false,
2968        };
2969
2970        // Get route cache config for stale settings
2971        let config = match self.cache_manager.get_route_config(route_id) {
2972            Some(c) => c,
2973            None => return false,
2974        };
2975
2976        // If there's an upstream error, use stale-if-error
2977        if let Some(e) = error {
2978            // Only serve stale for upstream errors
2979            if e.esource() == &pingora::ErrorSource::Upstream {
2980                debug!(
2981                    correlation_id = %ctx.trace_id,
2982                    route_id = %route_id,
2983                    error = %e,
2984                    stale_if_error_secs = config.stale_if_error_secs,
2985                    "Considering stale-if-error"
2986                );
2987                return config.stale_if_error_secs > 0;
2988            }
2989        }
2990
2991        // During stale-while-revalidate (error is None)
2992        if error.is_none() && config.stale_while_revalidate_secs > 0 {
2993            trace!(
2994                correlation_id = %ctx.trace_id,
2995                route_id = %route_id,
2996                stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2997                "Allowing stale-while-revalidate"
2998            );
2999            return true;
3000        }
3001
3002        false
3003    }
3004
3005    /// Handle Range header for byte-range requests (streaming support).
3006    ///
3007    /// This method is called when a Range header is present in the request.
3008    /// It allows proper handling of:
3009    /// - Video streaming (HTML5 video seeking)
3010    /// - Large file downloads with resume support
3011    /// - Partial content delivery
3012    ///
3013    /// Uses Pingora's built-in range handling with route-specific logging.
3014    fn range_header_filter(
3015        &self,
3016        session: &mut Session,
3017        response: &mut ResponseHeader,
3018        ctx: &mut Self::CTX,
3019    ) -> pingora_proxy::RangeType
3020    where
3021        Self::CTX: Send + Sync,
3022    {
3023        // Check if route supports range requests
3024        let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
3025            // Static file routes and media routes should support range requests
3026            matches!(
3027                config.service_type,
3028                grapsus_config::ServiceType::Static | grapsus_config::ServiceType::Web
3029            )
3030        });
3031
3032        if !supports_range {
3033            trace!(
3034                correlation_id = %ctx.trace_id,
3035                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3036                "Range request not supported for this route type"
3037            );
3038            return pingora_proxy::RangeType::None;
3039        }
3040
3041        // Use Pingora's built-in range header parsing and handling
3042        let range_type = pingora_proxy::range_header_filter(session.req_header(), response, None);
3043
3044        match &range_type {
3045            pingora_proxy::RangeType::None => {
3046                trace!(
3047                    correlation_id = %ctx.trace_id,
3048                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3049                    "No range request or not applicable"
3050                );
3051            }
3052            pingora_proxy::RangeType::Single(range) => {
3053                trace!(
3054                    correlation_id = %ctx.trace_id,
3055                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3056                    range_start = range.start,
3057                    range_end = range.end,
3058                    "Processing single-range request"
3059                );
3060            }
3061            pingora_proxy::RangeType::Multi(multi) => {
3062                trace!(
3063                    correlation_id = %ctx.trace_id,
3064                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3065                    range_count = multi.ranges.len(),
3066                    "Processing multi-range request"
3067                );
3068            }
3069            pingora_proxy::RangeType::Invalid => {
3070                debug!(
3071                    correlation_id = %ctx.trace_id,
3072                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3073                    "Invalid range header"
3074                );
3075            }
3076        }
3077
3078        range_type
3079    }
3080
3081    /// Handle fatal proxy errors by generating custom error pages.
3082    /// Called when the proxy itself fails to process the request.
3083    async fn fail_to_proxy(
3084        &self,
3085        session: &mut Session,
3086        e: &Error,
3087        ctx: &mut Self::CTX,
3088    ) -> pingora_proxy::FailToProxy
3089    where
3090        Self::CTX: Send + Sync,
3091    {
3092        let error_code = match e.etype() {
3093            // Connection errors
3094            ErrorType::ConnectRefused => 503,
3095            ErrorType::ConnectTimedout => 504,
3096            ErrorType::ConnectNoRoute => 502,
3097
3098            // Timeout errors
3099            ErrorType::ReadTimedout => 504,
3100            ErrorType::WriteTimedout => 504,
3101
3102            // TLS errors
3103            ErrorType::TLSHandshakeFailure => 502,
3104            ErrorType::InvalidCert => 502,
3105
3106            // Protocol errors
3107            ErrorType::InvalidHTTPHeader => 400,
3108            ErrorType::H2Error => 502,
3109
3110            // Resource errors
3111            ErrorType::ConnectProxyFailure => 502,
3112            ErrorType::ConnectionClosed => 502,
3113
3114            // Explicit HTTP status (e.g., from agent fail-closed blocking)
3115            ErrorType::HTTPStatus(status) => *status,
3116
3117            // Internal errors - return 502 for upstream issues (more accurate than 500)
3118            ErrorType::InternalError => {
3119                // Check if this is an upstream-related error
3120                let error_str = e.to_string();
3121                if error_str.contains("upstream")
3122                    || error_str.contains("DNS")
3123                    || error_str.contains("resolve")
3124                {
3125                    502
3126                } else {
3127                    500
3128                }
3129            }
3130
3131            // Default to 502 for unknown errors
3132            _ => 502,
3133        };
3134
3135        error!(
3136            correlation_id = %ctx.trace_id,
3137            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3138            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
3139            error_type = ?e.etype(),
3140            error = %e,
3141            error_code = error_code,
3142            "Proxy error occurred"
3143        );
3144
3145        // Record the error in metrics
3146        self.metrics
3147            .record_blocked_request(&format!("proxy_error_{}", error_code));
3148
3149        // Write error response to ensure client receives a proper HTTP response
3150        // This is necessary because some errors occur before the upstream connection
3151        // is established, and Pingora may not send a response automatically
3152        let error_message = match error_code {
3153            400 => "Bad Request",
3154            502 => "Bad Gateway",
3155            503 => "Service Unavailable",
3156            504 => "Gateway Timeout",
3157            _ => "Internal Server Error",
3158        };
3159
3160        // Build a minimal error response body
3161        let body = format!(
3162            r#"{{"error":"{} {}","trace_id":"{}"}}"#,
3163            error_code, error_message, ctx.trace_id
3164        );
3165
3166        // Write the response header
3167        let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
3168        header
3169            .insert_header("Content-Type", "application/json")
3170            .ok();
3171        header
3172            .insert_header("Content-Length", body.len().to_string())
3173            .ok();
3174        header
3175            .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
3176            .ok();
3177        header.insert_header("Connection", "close").ok();
3178
3179        // Write headers and body
3180        if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
3181            warn!(
3182                correlation_id = %ctx.trace_id,
3183                error = %write_err,
3184                "Failed to write error response header"
3185            );
3186        } else {
3187            // Write the body
3188            if let Err(write_err) = session
3189                .write_response_body(Some(bytes::Bytes::from(body)), true)
3190                .await
3191            {
3192                warn!(
3193                    correlation_id = %ctx.trace_id,
3194                    error = %write_err,
3195                    "Failed to write error response body"
3196                );
3197            }
3198        }
3199
3200        // Return the error response info
3201        // can_reuse_downstream: false since we already wrote and closed the response
3202        pingora_proxy::FailToProxy {
3203            error_code,
3204            can_reuse_downstream: false,
3205        }
3206    }
3207
3208    /// Handle errors that occur during proxying after upstream connection is established.
3209    ///
3210    /// This method enables retry logic and circuit breaker integration.
3211    /// It's called when an error occurs during the request/response exchange
3212    /// with the upstream server.
3213    fn error_while_proxy(
3214        &self,
3215        peer: &HttpPeer,
3216        session: &mut Session,
3217        e: Box<Error>,
3218        ctx: &mut Self::CTX,
3219        client_reused: bool,
3220    ) -> Box<Error> {
3221        let error_type = e.etype().clone();
3222        let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
3223
3224        // Classify error for retry decisions
3225        let is_retryable = matches!(
3226            error_type,
3227            ErrorType::ConnectTimedout
3228                | ErrorType::ReadTimedout
3229                | ErrorType::WriteTimedout
3230                | ErrorType::ConnectionClosed
3231                | ErrorType::ConnectRefused
3232        );
3233
3234        // Log the error with context
3235        warn!(
3236            correlation_id = %ctx.trace_id,
3237            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3238            upstream = %upstream_id,
3239            peer_address = %peer.address(),
3240            error_type = ?error_type,
3241            error = %e,
3242            client_reused = client_reused,
3243            is_retryable = is_retryable,
3244            "Error during proxy operation"
3245        );
3246
3247        // Record failure with circuit breaker via upstream pool
3248        // This is done asynchronously since we can't await in a sync fn
3249        let peer_address = peer.address().to_string();
3250        let upstream_pools = self.upstream_pools.clone();
3251        let upstream_id_owned = upstream_id.to_string();
3252        tokio::spawn(async move {
3253            if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
3254                pool.report_result(&peer_address, false).await;
3255            }
3256        });
3257
3258        // Metrics tracking
3259        self.metrics
3260            .record_blocked_request(&format!("proxy_error_{:?}", error_type));
3261
3262        // Create enhanced error with retry information
3263        let mut enhanced_error = e.more_context(format!(
3264            "Upstream: {}, Peer: {}, Attempts: {}",
3265            upstream_id,
3266            peer.address(),
3267            ctx.upstream_attempts
3268        ));
3269
3270        // Determine if retry should be attempted:
3271        // - Only retry if error is retryable type
3272        // - Only retry reused connections if buffer isn't truncated
3273        // - Track retry metrics
3274        if is_retryable {
3275            let can_retry = if client_reused {
3276                // For reused connections, check if retry buffer is intact
3277                !session.as_ref().retry_buffer_truncated()
3278            } else {
3279                // Fresh connections can always retry
3280                true
3281            };
3282
3283            enhanced_error.retry.decide_reuse(can_retry);
3284
3285            if can_retry {
3286                debug!(
3287                    correlation_id = %ctx.trace_id,
3288                    upstream = %upstream_id,
3289                    error_type = ?error_type,
3290                    "Error is retryable, will attempt retry"
3291                );
3292            }
3293        } else {
3294            // Non-retryable error - don't retry
3295            enhanced_error.retry.decide_reuse(false);
3296        }
3297
3298        enhanced_error
3299    }
3300
3301    async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
3302        // Decrement active requests
3303        self.reload_coordinator.dec_requests();
3304
3305        // === Fire pending shadow request (if body buffering was enabled) ===
3306        if !ctx.shadow_sent {
3307            if let Some(shadow_pending) = ctx.shadow_pending.take() {
3308                let body = if shadow_pending.include_body && !ctx.body_buffer.is_empty() {
3309                    // Clone the buffered body for the shadow request
3310                    Some(ctx.body_buffer.clone())
3311                } else {
3312                    None
3313                };
3314
3315                trace!(
3316                    correlation_id = %ctx.trace_id,
3317                    body_size = body.as_ref().map(|b| b.len()).unwrap_or(0),
3318                    "Firing deferred shadow request with buffered body"
3319                );
3320
3321                shadow_pending.manager.shadow_request(
3322                    shadow_pending.headers,
3323                    body,
3324                    shadow_pending.request_ctx,
3325                );
3326                ctx.shadow_sent = true;
3327            }
3328        }
3329
3330        let duration = ctx.elapsed();
3331
3332        // Get response status
3333        let status = session
3334            .response_written()
3335            .map(|r| r.status.as_u16())
3336            .unwrap_or(0);
3337
3338        // Report result to load balancer for adaptive LB feedback
3339        // This enables latency-aware weight adjustment
3340        if let (Some(ref peer_addr), Some(ref upstream_id)) =
3341            (&ctx.selected_upstream_address, &ctx.upstream)
3342        {
3343            // Success = status code < 500 (client errors are not upstream failures)
3344            let success = status > 0 && status < 500;
3345
3346            if let Some(pool) = self.upstream_pools.get(upstream_id).await {
3347                pool.report_result_with_latency(peer_addr, success, Some(duration))
3348                    .await;
3349                trace!(
3350                    correlation_id = %ctx.trace_id,
3351                    upstream = %upstream_id,
3352                    peer_address = %peer_addr,
3353                    success = success,
3354                    duration_ms = duration.as_millis(),
3355                    status = status,
3356                    "Reported result to adaptive load balancer"
3357                );
3358            }
3359
3360            // Track warmth for inference routes (cold model detection)
3361            if ctx.inference_rate_limit_enabled && success {
3362                let cold_detected = self.warmth_tracker.record_request(peer_addr, duration);
3363                if cold_detected {
3364                    debug!(
3365                        correlation_id = %ctx.trace_id,
3366                        upstream = %upstream_id,
3367                        peer_address = %peer_addr,
3368                        duration_ms = duration.as_millis(),
3369                        "Cold model detected on inference upstream"
3370                    );
3371                }
3372            }
3373        }
3374
3375        // Record actual token usage for inference rate limiting
3376        // This adjusts the token bucket based on actual vs estimated tokens
3377        if ctx.inference_rate_limit_enabled {
3378            if let (Some(route_id), Some(ref rate_limit_key)) =
3379                (ctx.route_id.as_deref(), &ctx.inference_rate_limit_key)
3380            {
3381                // Try to extract actual tokens from response headers
3382                let response_headers = session
3383                    .response_written()
3384                    .map(|r| &r.headers)
3385                    .cloned()
3386                    .unwrap_or_default();
3387
3388                // For streaming responses, finalize the streaming token counter
3389                let streaming_result = if ctx.inference_streaming_response {
3390                    ctx.inference_streaming_counter
3391                        .as_ref()
3392                        .map(|counter| counter.finalize())
3393                } else {
3394                    None
3395                };
3396
3397                // Log streaming token count info
3398                if let Some(ref result) = streaming_result {
3399                    debug!(
3400                        correlation_id = %ctx.trace_id,
3401                        output_tokens = result.output_tokens,
3402                        input_tokens = ?result.input_tokens,
3403                        source = ?result.source,
3404                        content_length = result.content_length,
3405                        "Finalized streaming token count"
3406                    );
3407                }
3408
3409                // PII detection guardrail (for streaming inference responses)
3410                if ctx.inference_streaming_response {
3411                    if let Some(ref route_config) = ctx.route_config {
3412                        if let Some(ref inference) = route_config.inference {
3413                            if let Some(ref guardrails) = inference.guardrails {
3414                                if let Some(ref pii_config) = guardrails.pii_detection {
3415                                    if pii_config.enabled {
3416                                        // Get accumulated content from streaming counter
3417                                        if let Some(ref counter) = ctx.inference_streaming_counter {
3418                                            let response_content = counter.content();
3419                                            if !response_content.is_empty() {
3420                                                let pii_result = self
3421                                                    .guardrail_processor
3422                                                    .check_pii(
3423                                                        pii_config,
3424                                                        response_content,
3425                                                        ctx.route_id.as_deref(),
3426                                                        &ctx.trace_id,
3427                                                    )
3428                                                    .await;
3429
3430                                                match pii_result {
3431                                                    crate::inference::PiiCheckResult::Detected {
3432                                                        detections,
3433                                                        redacted_content: _,
3434                                                    } => {
3435                                                        warn!(
3436                                                            correlation_id = %ctx.trace_id,
3437                                                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3438                                                            detection_count = detections.len(),
3439                                                            "PII detected in inference response"
3440                                                        );
3441
3442                                                        // Store detection categories for logging
3443                                                        ctx.pii_detection_categories = detections
3444                                                            .iter()
3445                                                            .map(|d| d.category.clone())
3446                                                            .collect();
3447
3448                                                        // Record metrics for each category
3449                                                        for detection in &detections {
3450                                                            self.metrics.record_pii_detected(
3451                                                                ctx.route_id.as_deref().unwrap_or("unknown"),
3452                                                                &detection.category,
3453                                                            );
3454                                                        }
3455                                                    }
3456                                                    crate::inference::PiiCheckResult::Clean => {
3457                                                        trace!(
3458                                                            correlation_id = %ctx.trace_id,
3459                                                            "No PII detected in response"
3460                                                        );
3461                                                    }
3462                                                    crate::inference::PiiCheckResult::Error { message } => {
3463                                                        debug!(
3464                                                            correlation_id = %ctx.trace_id,
3465                                                            error = %message,
3466                                                            "PII detection check failed"
3467                                                        );
3468                                                    }
3469                                                }
3470                                            }
3471                                        }
3472                                    }
3473                                }
3474                            }
3475                        }
3476                    }
3477                }
3478
3479                // Response body would require buffering, which is expensive
3480                // For non-streaming, most LLM APIs provide token counts in headers
3481                // For streaming, we use the accumulated SSE content
3482                let empty_body: &[u8] = &[];
3483
3484                if let Some(actual_estimate) = self.inference_rate_limit_manager.record_actual(
3485                    route_id,
3486                    rate_limit_key,
3487                    &response_headers,
3488                    empty_body,
3489                    ctx.inference_estimated_tokens,
3490                ) {
3491                    // Use streaming result if available and header extraction failed
3492                    let (actual_tokens, source_info) = if let Some(ref streaming) = streaming_result
3493                    {
3494                        // Prefer API-provided counts from streaming, otherwise use tiktoken count
3495                        if streaming.total_tokens.is_some() {
3496                            (streaming.total_tokens.unwrap(), "streaming_api")
3497                        } else if actual_estimate.source == crate::inference::TokenSource::Estimated
3498                        {
3499                            // Header extraction failed, use streaming tiktoken count
3500                            // Estimate total by adding input estimate + output from streaming
3501                            let total = ctx.inference_input_tokens + streaming.output_tokens;
3502                            (total, "streaming_tiktoken")
3503                        } else {
3504                            (actual_estimate.tokens, "headers")
3505                        }
3506                    } else {
3507                        (actual_estimate.tokens, "headers")
3508                    };
3509
3510                    ctx.inference_actual_tokens = Some(actual_tokens);
3511
3512                    debug!(
3513                        correlation_id = %ctx.trace_id,
3514                        route_id = route_id,
3515                        estimated_tokens = ctx.inference_estimated_tokens,
3516                        actual_tokens = actual_tokens,
3517                        source = source_info,
3518                        streaming_response = ctx.inference_streaming_response,
3519                        model = ?ctx.inference_model,
3520                        "Recorded actual inference tokens"
3521                    );
3522
3523                    // Record budget usage with actual tokens (if budget tracking enabled)
3524                    if ctx.inference_budget_enabled {
3525                        let alerts = self.inference_rate_limit_manager.record_budget(
3526                            route_id,
3527                            rate_limit_key,
3528                            actual_tokens,
3529                        );
3530
3531                        // Log any budget alerts that fired
3532                        for alert in alerts.iter() {
3533                            warn!(
3534                                correlation_id = %ctx.trace_id,
3535                                route_id = route_id,
3536                                tenant = %alert.tenant,
3537                                threshold_pct = alert.threshold * 100.0,
3538                                tokens_used = alert.tokens_used,
3539                                tokens_limit = alert.tokens_limit,
3540                                "Token budget alert threshold crossed"
3541                            );
3542                        }
3543
3544                        // Update context with remaining budget
3545                        if let Some(status) = self
3546                            .inference_rate_limit_manager
3547                            .budget_status(route_id, rate_limit_key)
3548                        {
3549                            ctx.inference_budget_remaining = Some(status.tokens_remaining as i64);
3550                        }
3551                    }
3552
3553                    // Calculate cost if cost attribution is enabled
3554                    if ctx.inference_cost_enabled {
3555                        if let Some(model) = ctx.inference_model.as_deref() {
3556                            // Use streaming result for more accurate input/output split if available
3557                            let (input_tokens, output_tokens) = if let Some(ref streaming) =
3558                                streaming_result
3559                            {
3560                                // Streaming gives us accurate output tokens
3561                                let input =
3562                                    streaming.input_tokens.unwrap_or(ctx.inference_input_tokens);
3563                                let output = streaming.output_tokens;
3564                                (input, output)
3565                            } else {
3566                                // Fallback: estimate output from total - input
3567                                let input = ctx.inference_input_tokens;
3568                                let output = actual_tokens.saturating_sub(input);
3569                                (input, output)
3570                            };
3571                            ctx.inference_output_tokens = output_tokens;
3572
3573                            if let Some(cost_result) = self
3574                                .inference_rate_limit_manager
3575                                .calculate_cost(route_id, model, input_tokens, output_tokens)
3576                            {
3577                                ctx.inference_request_cost = Some(cost_result.total_cost);
3578
3579                                trace!(
3580                                    correlation_id = %ctx.trace_id,
3581                                    route_id = route_id,
3582                                    model = model,
3583                                    input_tokens = input_tokens,
3584                                    output_tokens = output_tokens,
3585                                    total_cost = cost_result.total_cost,
3586                                    currency = %cost_result.currency,
3587                                    "Calculated inference request cost"
3588                                );
3589                            }
3590                        }
3591                    }
3592                }
3593            }
3594        }
3595
3596        // Write to access log file if configured (check sampling before allocating entry)
3597        if self.log_manager.should_log_access(status) {
3598            let access_entry = AccessLogEntry {
3599                timestamp: chrono::Utc::now().to_rfc3339(),
3600                trace_id: ctx.trace_id.clone(),
3601                method: ctx.method.clone(),
3602                path: ctx.path.clone(),
3603                query: ctx.query.clone(),
3604                protocol: "HTTP/1.1".to_string(),
3605                status,
3606                body_bytes: ctx.response_bytes,
3607                duration_ms: duration.as_millis() as u64,
3608                client_ip: ctx.client_ip.clone(),
3609                user_agent: ctx.user_agent.clone(),
3610                referer: ctx.referer.clone(),
3611                host: ctx.host.clone(),
3612                route_id: ctx.route_id.clone(),
3613                upstream: ctx.upstream.clone(),
3614                upstream_attempts: ctx.upstream_attempts,
3615                instance_id: self.app_state.instance_id.clone(),
3616                namespace: ctx.namespace.clone(),
3617                service: ctx.service.clone(),
3618                // New fields
3619                body_bytes_sent: ctx.response_bytes,
3620                upstream_addr: ctx.selected_upstream_address.clone(),
3621                connection_reused: ctx.connection_reused,
3622                rate_limit_hit: status == 429,
3623                geo_country: ctx.geo_country_code.clone(),
3624            };
3625            self.log_manager.log_access(&access_entry);
3626        }
3627
3628        // Log to tracing at debug level (avoid allocations if debug disabled)
3629        if tracing::enabled!(tracing::Level::DEBUG) {
3630            debug!(
3631                trace_id = %ctx.trace_id,
3632                method = %ctx.method,
3633                path = %ctx.path,
3634                route_id = ?ctx.route_id,
3635                upstream = ?ctx.upstream,
3636                status = status,
3637                duration_ms = duration.as_millis() as u64,
3638                upstream_attempts = ctx.upstream_attempts,
3639                error = ?_error.map(|e| e.to_string()),
3640                "Request completed"
3641            );
3642        }
3643
3644        // Log WebSocket upgrades at info level
3645        if ctx.is_websocket_upgrade && status == 101 {
3646            info!(
3647                trace_id = %ctx.trace_id,
3648                route_id = ?ctx.route_id,
3649                upstream = ?ctx.upstream,
3650                client_ip = %ctx.client_ip,
3651                "WebSocket connection established"
3652            );
3653        }
3654
3655        // End OpenTelemetry span
3656        if let Some(span) = ctx.otel_span.take() {
3657            span.end();
3658        }
3659    }
3660}
3661
3662// =============================================================================
3663// Helper methods for body streaming (not part of ProxyHttp trait)
3664// =============================================================================
3665
3666impl GrapsusProxy {
3667    /// Process a single body chunk in streaming mode.
3668    async fn process_body_chunk_streaming(
3669        &self,
3670        body: &mut Option<Bytes>,
3671        end_of_stream: bool,
3672        ctx: &mut RequestContext,
3673    ) -> Result<(), Box<Error>> {
3674        // Clone the chunk data to avoid borrowing issues when mutating body later
3675        let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
3676        let chunk_index = ctx.request_body_chunk_index;
3677        ctx.request_body_chunk_index += 1;
3678        ctx.body_bytes_inspected += chunk_data.len() as u64;
3679
3680        debug!(
3681            correlation_id = %ctx.trace_id,
3682            chunk_index = chunk_index,
3683            chunk_size = chunk_data.len(),
3684            end_of_stream = end_of_stream,
3685            "Streaming body chunk to agents"
3686        );
3687
3688        // Create agent call context
3689        let agent_ctx = crate::agents::AgentCallContext {
3690            correlation_id: grapsus_common::CorrelationId::from_string(&ctx.trace_id),
3691            metadata: grapsus_agent_protocol::RequestMetadata {
3692                correlation_id: ctx.trace_id.clone(),
3693                request_id: ctx.trace_id.clone(),
3694                client_ip: ctx.client_ip.clone(),
3695                client_port: 0,
3696                server_name: ctx.host.clone(),
3697                protocol: "HTTP/1.1".to_string(),
3698                tls_version: None,
3699                tls_cipher: None,
3700                route_id: ctx.route_id.clone(),
3701                upstream_id: ctx.upstream.clone(),
3702                timestamp: chrono::Utc::now().to_rfc3339(),
3703                traceparent: ctx.traceparent(),
3704            },
3705            route_id: ctx.route_id.clone(),
3706            upstream_id: ctx.upstream.clone(),
3707            request_body: None, // Not used in streaming mode
3708            response_body: None,
3709        };
3710
3711        let agent_ids = ctx.body_inspection_agents.clone();
3712        let total_size = None; // Unknown in streaming mode
3713
3714        match self
3715            .agent_manager
3716            .process_request_body_streaming(
3717                &agent_ctx,
3718                &chunk_data,
3719                end_of_stream,
3720                chunk_index,
3721                ctx.body_bytes_inspected as usize,
3722                total_size,
3723                &agent_ids,
3724            )
3725            .await
3726        {
3727            Ok(decision) => {
3728                // Track if agent needs more data
3729                ctx.agent_needs_more = decision.needs_more;
3730
3731                // Apply body mutation if present
3732                if let Some(ref mutation) = decision.request_body_mutation {
3733                    if !mutation.is_pass_through() {
3734                        if mutation.is_drop() {
3735                            // Drop the chunk
3736                            *body = None;
3737                            trace!(
3738                                correlation_id = %ctx.trace_id,
3739                                chunk_index = chunk_index,
3740                                "Agent dropped body chunk"
3741                            );
3742                        } else if let Some(ref new_data) = mutation.data {
3743                            // Replace chunk with mutated content
3744                            *body = Some(Bytes::from(new_data.clone()));
3745                            trace!(
3746                                correlation_id = %ctx.trace_id,
3747                                chunk_index = chunk_index,
3748                                original_size = chunk_data.len(),
3749                                new_size = new_data.len(),
3750                                "Agent mutated body chunk"
3751                            );
3752                        }
3753                    }
3754                }
3755
3756                // Check decision (only final if needs_more is false)
3757                if !decision.needs_more && !decision.is_allow() {
3758                    warn!(
3759                        correlation_id = %ctx.trace_id,
3760                        action = ?decision.action,
3761                        "Agent blocked request body"
3762                    );
3763                    self.metrics.record_blocked_request("agent_body_inspection");
3764
3765                    let (status, message) = match &decision.action {
3766                        crate::agents::AgentAction::Block { status, body, .. } => (
3767                            *status,
3768                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
3769                        ),
3770                        _ => (403, "Forbidden".to_string()),
3771                    };
3772
3773                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3774                }
3775
3776                trace!(
3777                    correlation_id = %ctx.trace_id,
3778                    needs_more = decision.needs_more,
3779                    "Agent processed body chunk"
3780                );
3781            }
3782            Err(e) => {
3783                let fail_closed = ctx
3784                    .route_config
3785                    .as_ref()
3786                    .map(|r| r.policies.failure_mode == grapsus_config::FailureMode::Closed)
3787                    .unwrap_or(false);
3788
3789                if fail_closed {
3790                    error!(
3791                        correlation_id = %ctx.trace_id,
3792                        error = %e,
3793                        "Agent streaming body inspection failed, blocking (fail-closed)"
3794                    );
3795                    self.log_manager.log_request_error(
3796                        "error",
3797                        "Agent streaming body inspection failed, blocking (fail-closed)",
3798                        &ctx.trace_id,
3799                        ctx.route_id.as_deref(),
3800                        ctx.upstream.as_deref(),
3801                        Some(format!("error={}", e)),
3802                    );
3803                    return Err(Error::explain(
3804                        ErrorType::HTTPStatus(503),
3805                        "Service unavailable",
3806                    ));
3807                } else {
3808                    warn!(
3809                        correlation_id = %ctx.trace_id,
3810                        error = %e,
3811                        "Agent streaming body inspection failed, allowing (fail-open)"
3812                    );
3813                    self.log_manager.log_request_error(
3814                        "warn",
3815                        "Agent streaming body inspection failed, allowing (fail-open)",
3816                        &ctx.trace_id,
3817                        ctx.route_id.as_deref(),
3818                        ctx.upstream.as_deref(),
3819                        Some(format!("error={}", e)),
3820                    );
3821                }
3822            }
3823        }
3824
3825        Ok(())
3826    }
3827
3828    /// Send buffered body to agents (buffer mode).
3829    async fn send_buffered_body_to_agents(
3830        &self,
3831        end_of_stream: bool,
3832        ctx: &mut RequestContext,
3833    ) -> Result<(), Box<Error>> {
3834        debug!(
3835            correlation_id = %ctx.trace_id,
3836            buffer_size = ctx.body_buffer.len(),
3837            end_of_stream = end_of_stream,
3838            agent_count = ctx.body_inspection_agents.len(),
3839            decompression_enabled = ctx.decompression_enabled,
3840            "Sending buffered body to agents for inspection"
3841        );
3842
3843        // Decompress body if enabled and we have a supported encoding
3844        let body_for_inspection = if ctx.decompression_enabled {
3845            if let Some(ref encoding) = ctx.body_content_encoding {
3846                let config = crate::decompression::DecompressionConfig {
3847                    max_ratio: ctx.max_decompression_ratio,
3848                    max_output_bytes: ctx.max_decompression_bytes,
3849                };
3850
3851                match crate::decompression::decompress_body(&ctx.body_buffer, encoding, &config) {
3852                    Ok(result) => {
3853                        ctx.body_was_decompressed = true;
3854                        self.metrics
3855                            .record_decompression_success(encoding, result.ratio);
3856                        debug!(
3857                            correlation_id = %ctx.trace_id,
3858                            encoding = %encoding,
3859                            compressed_size = result.compressed_size,
3860                            decompressed_size = result.decompressed_size,
3861                            ratio = result.ratio,
3862                            "Body decompressed for agent inspection"
3863                        );
3864                        result.data
3865                    }
3866                    Err(e) => {
3867                        // Record failure metric
3868                        let failure_reason = match &e {
3869                            crate::decompression::DecompressionError::RatioExceeded { .. } => {
3870                                "ratio_exceeded"
3871                            }
3872                            crate::decompression::DecompressionError::SizeExceeded { .. } => {
3873                                "size_exceeded"
3874                            }
3875                            crate::decompression::DecompressionError::InvalidData { .. } => {
3876                                "invalid_data"
3877                            }
3878                            crate::decompression::DecompressionError::UnsupportedEncoding {
3879                                ..
3880                            } => "unsupported",
3881                            crate::decompression::DecompressionError::IoError(_) => "io_error",
3882                        };
3883                        self.metrics
3884                            .record_decompression_failure(encoding, failure_reason);
3885
3886                        // Decompression failed - decide based on failure mode
3887                        let fail_closed = ctx
3888                            .route_config
3889                            .as_ref()
3890                            .map(|r| {
3891                                r.policies.failure_mode == grapsus_config::FailureMode::Closed
3892                            })
3893                            .unwrap_or(false);
3894
3895                        if fail_closed {
3896                            error!(
3897                                correlation_id = %ctx.trace_id,
3898                                error = %e,
3899                                encoding = %encoding,
3900                                "Decompression failed, blocking (fail-closed)"
3901                            );
3902                            return Err(Error::explain(
3903                                ErrorType::HTTPStatus(400),
3904                                "Invalid compressed body",
3905                            ));
3906                        } else {
3907                            warn!(
3908                                correlation_id = %ctx.trace_id,
3909                                error = %e,
3910                                encoding = %encoding,
3911                                "Decompression failed, sending compressed body (fail-open)"
3912                            );
3913                            ctx.body_buffer.clone()
3914                        }
3915                    }
3916                }
3917            } else {
3918                ctx.body_buffer.clone()
3919            }
3920        } else {
3921            ctx.body_buffer.clone()
3922        };
3923
3924        let agent_ctx = crate::agents::AgentCallContext {
3925            correlation_id: grapsus_common::CorrelationId::from_string(&ctx.trace_id),
3926            metadata: grapsus_agent_protocol::RequestMetadata {
3927                correlation_id: ctx.trace_id.clone(),
3928                request_id: ctx.trace_id.clone(),
3929                client_ip: ctx.client_ip.clone(),
3930                client_port: 0,
3931                server_name: ctx.host.clone(),
3932                protocol: "HTTP/1.1".to_string(),
3933                tls_version: None,
3934                tls_cipher: None,
3935                route_id: ctx.route_id.clone(),
3936                upstream_id: ctx.upstream.clone(),
3937                timestamp: chrono::Utc::now().to_rfc3339(),
3938                traceparent: ctx.traceparent(),
3939            },
3940            route_id: ctx.route_id.clone(),
3941            upstream_id: ctx.upstream.clone(),
3942            request_body: Some(body_for_inspection.clone()),
3943            response_body: None,
3944        };
3945
3946        let agent_ids = ctx.body_inspection_agents.clone();
3947        match self
3948            .agent_manager
3949            .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
3950            .await
3951        {
3952            Ok(decision) => {
3953                if !decision.is_allow() {
3954                    warn!(
3955                        correlation_id = %ctx.trace_id,
3956                        action = ?decision.action,
3957                        "Agent blocked request body"
3958                    );
3959                    self.metrics.record_blocked_request("agent_body_inspection");
3960
3961                    let (status, message) = match &decision.action {
3962                        crate::agents::AgentAction::Block { status, body, .. } => (
3963                            *status,
3964                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
3965                        ),
3966                        _ => (403, "Forbidden".to_string()),
3967                    };
3968
3969                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3970                }
3971
3972                trace!(
3973                    correlation_id = %ctx.trace_id,
3974                    "Agent allowed request body"
3975                );
3976            }
3977            Err(e) => {
3978                let fail_closed = ctx
3979                    .route_config
3980                    .as_ref()
3981                    .map(|r| r.policies.failure_mode == grapsus_config::FailureMode::Closed)
3982                    .unwrap_or(false);
3983
3984                if fail_closed {
3985                    error!(
3986                        correlation_id = %ctx.trace_id,
3987                        error = %e,
3988                        "Agent body inspection failed, blocking (fail-closed)"
3989                    );
3990                    self.log_manager.log_request_error(
3991                        "error",
3992                        "Agent body inspection failed, blocking (fail-closed)",
3993                        &ctx.trace_id,
3994                        ctx.route_id.as_deref(),
3995                        ctx.upstream.as_deref(),
3996                        Some(format!("error={}", e)),
3997                    );
3998                    return Err(Error::explain(
3999                        ErrorType::HTTPStatus(503),
4000                        "Service unavailable",
4001                    ));
4002                } else {
4003                    warn!(
4004                        correlation_id = %ctx.trace_id,
4005                        error = %e,
4006                        "Agent body inspection failed, allowing (fail-open)"
4007                    );
4008                    self.log_manager.log_request_error(
4009                        "warn",
4010                        "Agent body inspection failed, allowing (fail-open)",
4011                        &ctx.trace_id,
4012                        ctx.route_id.as_deref(),
4013                        ctx.upstream.as_deref(),
4014                        Some(format!("error={}", e)),
4015                    );
4016                }
4017            }
4018        }
4019
4020        Ok(())
4021    }
4022}