sentinel_proxy/proxy/
http_trait.rs

1//! ProxyHttp trait implementation for SentinelProxy.
2//!
3//! This module contains the Pingora ProxyHttp trait implementation which defines
4//! the core request/response lifecycle handling.
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14    CacheKey, CacheMeta, ForcedInvalidationKind, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
23use crate::rate_limit::HeaderAccessor;
24use crate::routing::RequestInfo;
25
26use super::context::RequestContext;
27use super::SentinelProxy;
28
29/// Helper type for rate limiting when we don't need header access
30struct NoHeaderAccessor;
31impl HeaderAccessor for NoHeaderAccessor {
32    fn get_header(&self, _name: &str) -> Option<String> {
33        None
34    }
35}
36
37#[async_trait]
38impl ProxyHttp for SentinelProxy {
39    type CTX = RequestContext;
40
41    fn new_ctx(&self) -> Self::CTX {
42        RequestContext::new()
43    }
44
45    fn fail_to_connect(
46        &self,
47        _session: &mut Session,
48        peer: &HttpPeer,
49        ctx: &mut Self::CTX,
50        e: Box<Error>,
51    ) -> Box<Error> {
52        error!(
53            correlation_id = %ctx.trace_id,
54            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
55            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
56            peer_address = %peer.address(),
57            error = %e,
58            "Failed to connect to upstream peer"
59        );
60        // Custom error pages are handled in response_filter
61        e
62    }
63
64    /// Early request filter - runs before upstream selection
65    /// Used to handle builtin routes that don't need an upstream connection
66    async fn early_request_filter(
67        &self,
68        session: &mut Session,
69        ctx: &mut Self::CTX,
70    ) -> Result<(), Box<Error>> {
71        // Extract request info for routing
72        let req_header = session.req_header();
73        let method = req_header.method.as_str();
74        let path = req_header.uri.path();
75        let host = req_header
76            .headers
77            .get("host")
78            .and_then(|h| h.to_str().ok())
79            .unwrap_or("");
80
81        ctx.method = method.to_string();
82        ctx.path = path.to_string();
83        ctx.host = Some(host.to_string());
84
85        // Match route to determine service type
86        let route_match = {
87            let route_matcher = self.route_matcher.read();
88            let request_info = RequestInfo::new(method, path, host);
89            match route_matcher.match_request(&request_info) {
90                Some(m) => m,
91                None => return Ok(()), // No matching route, let upstream_peer handle it
92            }
93        };
94
95        ctx.trace_id = self.get_trace_id(session);
96        ctx.route_id = Some(route_match.route_id.to_string());
97        ctx.route_config = Some(route_match.config.clone());
98
99        // Check if this is a builtin handler route
100        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
101            trace!(
102                correlation_id = %ctx.trace_id,
103                route_id = %route_match.route_id,
104                builtin_handler = ?route_match.config.builtin_handler,
105                "Handling builtin route in early_request_filter"
106            );
107
108            // Handle the builtin route directly
109            let handled = self
110                .handle_builtin_route(session, ctx, &route_match)
111                .await?;
112
113            if handled {
114                // Return error to signal that request is complete (Pingora will not continue)
115                return Err(Error::explain(
116                    ErrorType::InternalError,
117                    "Builtin handler complete",
118                ));
119            }
120        }
121
122        Ok(())
123    }
124
125    async fn upstream_peer(
126        &self,
127        session: &mut Session,
128        ctx: &mut Self::CTX,
129    ) -> Result<Box<HttpPeer>, Box<Error>> {
130        // Track active request
131        self.reload_coordinator.inc_requests();
132
133        // Cache global config once per request (avoids repeated Arc clones)
134        if ctx.config.is_none() {
135            ctx.config = Some(self.config_manager.current());
136        }
137
138        // Cache client address for logging if not already set
139        if ctx.client_ip.is_empty() {
140            ctx.client_ip = session
141                .client_addr()
142                .map(|a| a.to_string())
143                .unwrap_or_else(|| "unknown".to_string());
144        }
145
146        let req_header = session.req_header();
147
148        // Cache request info for access logging if not already set
149        if ctx.method.is_empty() {
150            ctx.method = req_header.method.to_string();
151            ctx.path = req_header.uri.path().to_string();
152            ctx.query = req_header.uri.query().map(|q| q.to_string());
153            ctx.host = req_header
154                .headers
155                .get("host")
156                .and_then(|v| v.to_str().ok())
157                .map(|s| s.to_string());
158        }
159        ctx.user_agent = req_header
160            .headers
161            .get("user-agent")
162            .and_then(|v| v.to_str().ok())
163            .map(|s| s.to_string());
164        ctx.referer = req_header
165            .headers
166            .get("referer")
167            .and_then(|v| v.to_str().ok())
168            .map(|s| s.to_string());
169
170        trace!(
171            correlation_id = %ctx.trace_id,
172            client_ip = %ctx.client_ip,
173            "Request received, initializing context"
174        );
175
176        // Use cached route info if already set by early_request_filter
177        let route_match = if let Some(ref route_config) = ctx.route_config {
178            let route_id = ctx.route_id.as_deref().unwrap_or("");
179            crate::routing::RouteMatch {
180                route_id: sentinel_common::RouteId::new(route_id),
181                config: route_config.clone(),
182            }
183        } else {
184            // Match route using sync RwLock (scoped to ensure lock is released before async ops)
185            let (match_result, route_duration) = {
186                let route_matcher = self.route_matcher.read();
187                let host = ctx.host.as_deref().unwrap_or("");
188
189                // Build request info (zero-copy for common case)
190                let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
191
192                // Only build headers HashMap if any route needs header matching
193                if route_matcher.needs_headers() {
194                    request_info = request_info
195                        .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
196                }
197
198                // Only parse query params if any route needs query param matching
199                if route_matcher.needs_query_params() {
200                    request_info =
201                        request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
202                }
203
204                trace!(
205                    correlation_id = %ctx.trace_id,
206                    method = %request_info.method,
207                    path = %request_info.path,
208                    host = %request_info.host,
209                    "Built request info for route matching"
210                );
211
212                let route_start = std::time::Instant::now();
213                let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
214                    warn!(
215                        correlation_id = %ctx.trace_id,
216                        method = %request_info.method,
217                        path = %request_info.path,
218                        host = %request_info.host,
219                        "No matching route found for request"
220                    );
221                    Error::explain(ErrorType::InternalError, "No matching route found")
222                })?;
223                let route_duration = route_start.elapsed();
224                // Lock is dropped here when block ends
225                (route_match, route_duration)
226            };
227
228            ctx.route_id = Some(match_result.route_id.to_string());
229            ctx.route_config = Some(match_result.config.clone());
230
231            trace!(
232                correlation_id = %ctx.trace_id,
233                route_id = %match_result.route_id,
234                route_duration_us = route_duration.as_micros(),
235                service_type = ?match_result.config.service_type,
236                "Route matched"
237            );
238            match_result
239        };
240
241        // Check if this is a builtin handler route (no upstream needed)
242        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
243            trace!(
244                correlation_id = %ctx.trace_id,
245                route_id = %route_match.route_id,
246                builtin_handler = ?route_match.config.builtin_handler,
247                "Route type is builtin, skipping upstream"
248            );
249            // Mark as builtin route for later processing in request_filter
250            ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
251            // Return error to skip upstream connection for builtin routes
252            return Err(Error::explain(
253                ErrorType::InternalError,
254                "Builtin handler handled in request_filter",
255            ));
256        }
257
258        // Check if this is a static file route
259        if route_match.config.service_type == sentinel_config::ServiceType::Static {
260            trace!(
261                correlation_id = %ctx.trace_id,
262                route_id = %route_match.route_id,
263                "Route type is static, checking for static server"
264            );
265            // Static routes don't need an upstream
266            if self
267                .static_servers
268                .get(route_match.route_id.as_str())
269                .await
270                .is_some()
271            {
272                // Mark this as a static route for later processing
273                ctx.upstream = Some(format!("_static_{}", route_match.route_id));
274                info!(
275                    correlation_id = %ctx.trace_id,
276                    route_id = %route_match.route_id,
277                    path = %ctx.path,
278                    "Serving static file"
279                );
280                // Return error to avoid upstream connection for static routes
281                return Err(Error::explain(
282                    ErrorType::InternalError,
283                    "Static file serving handled in request_filter",
284                ));
285            }
286        }
287
288        // Regular route with upstream
289        if let Some(ref upstream) = route_match.config.upstream {
290            ctx.upstream = Some(upstream.clone());
291            trace!(
292                correlation_id = %ctx.trace_id,
293                route_id = %route_match.route_id,
294                upstream = %upstream,
295                "Upstream configured for route"
296            );
297        } else {
298            error!(
299                correlation_id = %ctx.trace_id,
300                route_id = %route_match.route_id,
301                "Route has no upstream configured"
302            );
303            return Err(Error::explain(
304                ErrorType::InternalError,
305                format!(
306                    "Route '{}' has no upstream configured",
307                    route_match.route_id
308                ),
309            ));
310        }
311
312        debug!(
313            correlation_id = %ctx.trace_id,
314            route_id = %route_match.route_id,
315            upstream = ?ctx.upstream,
316            method = %req_header.method,
317            path = %req_header.uri.path(),
318            host = ctx.host.as_deref().unwrap_or("-"),
319            client_ip = %ctx.client_ip,
320            "Processing request"
321        );
322
323        // Get upstream pool (skip for static routes)
324        if ctx
325            .upstream
326            .as_ref()
327            .is_some_and(|u| u.starts_with("_static_"))
328        {
329            // Static routes are handled in request_filter, should not reach here
330            return Err(Error::explain(
331                ErrorType::InternalError,
332                "Static route should be handled in request_filter",
333            ));
334        }
335
336        let upstream_name = ctx
337            .upstream
338            .as_ref()
339            .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
340
341        trace!(
342            correlation_id = %ctx.trace_id,
343            upstream = %upstream_name,
344            "Looking up upstream pool"
345        );
346
347        let pool = self
348            .upstream_pools
349            .get(upstream_name)
350            .await
351            .ok_or_else(|| {
352                error!(
353                    correlation_id = %ctx.trace_id,
354                    upstream = %upstream_name,
355                    "Upstream pool not found"
356                );
357                Error::explain(
358                    ErrorType::InternalError,
359                    format!("Upstream pool '{}' not found", upstream_name),
360                )
361            })?;
362
363        // Select peer from pool with retries
364        let max_retries = route_match
365            .config
366            .retry_policy
367            .as_ref()
368            .map(|r| r.max_attempts)
369            .unwrap_or(1);
370
371        trace!(
372            correlation_id = %ctx.trace_id,
373            upstream = %upstream_name,
374            max_retries = max_retries,
375            "Starting upstream peer selection"
376        );
377
378        let mut last_error = None;
379        let selection_start = std::time::Instant::now();
380
381        for attempt in 1..=max_retries {
382            ctx.upstream_attempts = attempt;
383
384            trace!(
385                correlation_id = %ctx.trace_id,
386                upstream = %upstream_name,
387                attempt = attempt,
388                max_retries = max_retries,
389                "Attempting to select upstream peer"
390            );
391
392            match pool.select_peer(None).await {
393                Ok(peer) => {
394                    let selection_duration = selection_start.elapsed();
395                    debug!(
396                        correlation_id = %ctx.trace_id,
397                        upstream = %upstream_name,
398                        peer_address = %peer.address(),
399                        attempt = attempt,
400                        selection_duration_us = selection_duration.as_micros(),
401                        "Selected upstream peer"
402                    );
403                    return Ok(Box::new(peer));
404                }
405                Err(e) => {
406                    warn!(
407                        correlation_id = %ctx.trace_id,
408                        upstream = %upstream_name,
409                        attempt = attempt,
410                        max_retries = max_retries,
411                        error = %e,
412                        "Failed to select upstream peer"
413                    );
414                    last_error = Some(e);
415
416                    if attempt < max_retries {
417                        // Exponential backoff (using pingora-timeout for efficiency)
418                        let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
419                        trace!(
420                            correlation_id = %ctx.trace_id,
421                            backoff_ms = backoff.as_millis(),
422                            "Backing off before retry"
423                        );
424                        sleep(backoff).await;
425                    }
426                }
427            }
428        }
429
430        let selection_duration = selection_start.elapsed();
431        error!(
432            correlation_id = %ctx.trace_id,
433            upstream = %upstream_name,
434            attempts = max_retries,
435            selection_duration_ms = selection_duration.as_millis(),
436            last_error = ?last_error,
437            "All upstream selection attempts failed"
438        );
439
440        Err(Error::explain(
441            ErrorType::InternalError,
442            format!("All upstream attempts failed: {:?}", last_error),
443        ))
444    }
445
446    async fn request_filter(
447        &self,
448        session: &mut Session,
449        ctx: &mut Self::CTX,
450    ) -> Result<bool, Box<Error>> {
451        trace!(
452            correlation_id = %ctx.trace_id,
453            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
454            "Starting request filter phase"
455        );
456
457        // Check rate limiting early (before other processing)
458        // Fast path: skip if no rate limiting is configured for this route
459        if let Some(route_id) = ctx.route_id.as_deref() {
460            if self.rate_limit_manager.has_route_limiter(route_id) {
461                let rate_result = self.rate_limit_manager.check(
462                    route_id,
463                    &ctx.client_ip,
464                    &ctx.path,
465                    Option::<&NoHeaderAccessor>::None,
466                );
467
468                // Store rate limit info for response headers (even if allowed)
469                if rate_result.limit > 0 {
470                    ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
471                        limit: rate_result.limit,
472                        remaining: rate_result.remaining,
473                        reset_at: rate_result.reset_at,
474                    });
475                }
476
477                if !rate_result.allowed {
478                    use sentinel_config::RateLimitAction;
479
480                    match rate_result.action {
481                        RateLimitAction::Reject => {
482                            warn!(
483                                correlation_id = %ctx.trace_id,
484                                route_id = route_id,
485                                client_ip = %ctx.client_ip,
486                                limiter = %rate_result.limiter,
487                                limit = rate_result.limit,
488                                remaining = rate_result.remaining,
489                                "Request rate limited"
490                            );
491                            self.metrics.record_blocked_request("rate_limited");
492
493                            // Audit log the rate limit
494                            let audit_entry = AuditLogEntry::rate_limited(
495                                &ctx.trace_id,
496                                &ctx.method,
497                                &ctx.path,
498                                &ctx.client_ip,
499                                &rate_result.limiter,
500                            )
501                            .with_route_id(route_id)
502                            .with_status_code(rate_result.status_code);
503                            self.log_manager.log_audit(&audit_entry);
504
505                            // Send rate limit response with headers
506                            let body = rate_result
507                                .message
508                                .unwrap_or_else(|| "Rate limit exceeded".to_string());
509
510                            // Build response with rate limit headers
511                            let retry_after = rate_result.reset_at.saturating_sub(
512                                std::time::SystemTime::now()
513                                    .duration_since(std::time::UNIX_EPOCH)
514                                    .unwrap_or_default()
515                                    .as_secs(),
516                            );
517                            crate::http_helpers::write_rate_limit_error(
518                                session,
519                                rate_result.status_code,
520                                &body,
521                                rate_result.limit,
522                                rate_result.remaining,
523                                rate_result.reset_at,
524                                retry_after,
525                            )
526                            .await?;
527                            return Ok(true); // Request complete, don't continue
528                        }
529                        RateLimitAction::LogOnly => {
530                            debug!(
531                                correlation_id = %ctx.trace_id,
532                                route_id = route_id,
533                                "Rate limit exceeded (log only mode)"
534                            );
535                            // Continue processing
536                        }
537                        RateLimitAction::Delay => {
538                            // Apply delay if suggested by rate limiter
539                            if let Some(delay_ms) = rate_result.suggested_delay_ms {
540                                // Cap delay at the configured maximum
541                                let actual_delay = delay_ms.min(rate_result.max_delay_ms);
542
543                                if actual_delay > 0 {
544                                    debug!(
545                                        correlation_id = %ctx.trace_id,
546                                        route_id = route_id,
547                                        suggested_delay_ms = delay_ms,
548                                        max_delay_ms = rate_result.max_delay_ms,
549                                        actual_delay_ms = actual_delay,
550                                        "Applying rate limit delay"
551                                    );
552
553                                    tokio::time::sleep(std::time::Duration::from_millis(
554                                        actual_delay,
555                                    ))
556                                    .await;
557                                }
558                            }
559                            // Continue processing after delay
560                        }
561                    }
562                }
563            }
564        }
565
566        // Geo filtering
567        if let Some(route_id) = ctx.route_id.as_deref() {
568            if let Some(ref route_config) = ctx.route_config {
569                for filter_id in &route_config.filters {
570                    if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
571                        // Store country code for response header
572                        ctx.geo_country_code = result.country_code.clone();
573                        ctx.geo_lookup_performed = true;
574
575                        if !result.allowed {
576                            warn!(
577                                correlation_id = %ctx.trace_id,
578                                route_id = route_id,
579                                client_ip = %ctx.client_ip,
580                                country = ?result.country_code,
581                                filter_id = %filter_id,
582                                "Request blocked by geo filter"
583                            );
584                            self.metrics.record_blocked_request("geo_blocked");
585
586                            // Audit log the geo block
587                            let audit_entry = AuditLogEntry::new(
588                                &ctx.trace_id,
589                                AuditEventType::Blocked,
590                                &ctx.method,
591                                &ctx.path,
592                                &ctx.client_ip,
593                            )
594                            .with_route_id(route_id)
595                            .with_status_code(result.status_code)
596                            .with_reason(format!(
597                                "Geo blocked: country={}, filter={}",
598                                result.country_code.as_deref().unwrap_or("unknown"),
599                                filter_id
600                            ));
601                            self.log_manager.log_audit(&audit_entry);
602
603                            // Send geo block response
604                            let body = result
605                                .block_message
606                                .unwrap_or_else(|| "Access denied".to_string());
607
608                            crate::http_helpers::write_error(
609                                session,
610                                result.status_code,
611                                &body,
612                                "text/plain",
613                            )
614                            .await?;
615                            return Ok(true); // Request complete, don't continue
616                        }
617
618                        // Only check first geo filter that matches
619                        break;
620                    }
621                }
622            }
623        }
624
625        // Check for WebSocket upgrade requests
626        let is_websocket_upgrade = session
627            .req_header()
628            .headers
629            .get(http::header::UPGRADE)
630            .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
631            .unwrap_or(false);
632
633        if is_websocket_upgrade {
634            ctx.is_websocket_upgrade = true;
635
636            // Check if route allows WebSocket upgrades
637            if let Some(ref route_config) = ctx.route_config {
638                if !route_config.websocket {
639                    warn!(
640                        correlation_id = %ctx.trace_id,
641                        route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
642                        client_ip = %ctx.client_ip,
643                        "WebSocket upgrade rejected: not enabled for route"
644                    );
645
646                    self.metrics.record_blocked_request("websocket_not_enabled");
647
648                    // Audit log the rejection
649                    let audit_entry = AuditLogEntry::new(
650                        &ctx.trace_id,
651                        AuditEventType::Blocked,
652                        &ctx.method,
653                        &ctx.path,
654                        &ctx.client_ip,
655                    )
656                    .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
657                    .with_action("websocket_rejected")
658                    .with_reason("WebSocket not enabled for route");
659                    self.log_manager.log_audit(&audit_entry);
660
661                    // Send 403 Forbidden response
662                    crate::http_helpers::write_error(
663                        session,
664                        403,
665                        "WebSocket not enabled for this route",
666                        "text/plain",
667                    )
668                    .await?;
669                    return Ok(true); // Request complete, don't continue
670                }
671
672                debug!(
673                    correlation_id = %ctx.trace_id,
674                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
675                    "WebSocket upgrade request allowed"
676                );
677
678                // Check for WebSocket frame inspection
679                if route_config.websocket_inspection {
680                    // Check for compression negotiation - skip inspection if permessage-deflate
681                    let has_compression = session
682                        .req_header()
683                        .headers
684                        .get("Sec-WebSocket-Extensions")
685                        .and_then(|v| v.to_str().ok())
686                        .map(|s| s.contains("permessage-deflate"))
687                        .unwrap_or(false);
688
689                    if has_compression {
690                        debug!(
691                            correlation_id = %ctx.trace_id,
692                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
693                            "WebSocket inspection skipped: permessage-deflate negotiated"
694                        );
695                        ctx.websocket_skip_inspection = true;
696                    } else {
697                        ctx.websocket_inspection_enabled = true;
698
699                        // Get agents that handle WebSocketFrame events
700                        ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
701                            sentinel_agent_protocol::EventType::WebSocketFrame,
702                        );
703
704                        debug!(
705                            correlation_id = %ctx.trace_id,
706                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
707                            agent_count = ctx.websocket_inspection_agents.len(),
708                            "WebSocket frame inspection enabled"
709                        );
710                    }
711                }
712            }
713        }
714
715        // Use cached route config from upstream_peer (avoids duplicate route matching)
716        // Handle static file and builtin routes
717        if let Some(route_config) = ctx.route_config.clone() {
718            if route_config.service_type == sentinel_config::ServiceType::Static {
719                trace!(
720                    correlation_id = %ctx.trace_id,
721                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
722                    "Handling static file route"
723                );
724                // Create a minimal RouteMatch for the handler
725                let route_match = crate::routing::RouteMatch {
726                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
727                    config: route_config.clone(),
728                };
729                return self.handle_static_route(session, ctx, &route_match).await;
730            } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
731                trace!(
732                    correlation_id = %ctx.trace_id,
733                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
734                    builtin_handler = ?route_config.builtin_handler,
735                    "Handling builtin route"
736                );
737                // Create a minimal RouteMatch for the handler
738                let route_match = crate::routing::RouteMatch {
739                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
740                    config: route_config.clone(),
741                };
742                return self.handle_builtin_route(session, ctx, &route_match).await;
743            }
744        }
745
746        // API validation for API routes
747        if let Some(route_id) = ctx.route_id.clone() {
748            if let Some(validator) = self.validators.get(&route_id).await {
749                trace!(
750                    correlation_id = %ctx.trace_id,
751                    route_id = %route_id,
752                    "Running API schema validation"
753                );
754                if let Some(result) = self
755                    .validate_api_request(session, ctx, &route_id, &validator)
756                    .await?
757                {
758                    debug!(
759                        correlation_id = %ctx.trace_id,
760                        route_id = %route_id,
761                        validation_passed = result,
762                        "API validation complete"
763                    );
764                    return Ok(result);
765                }
766            }
767        }
768
769        // Get client address before mutable borrow
770        let client_addr = session
771            .client_addr()
772            .map(|a| format!("{}", a))
773            .unwrap_or_else(|| "unknown".to_string());
774        let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
775
776        let req_header = session.req_header_mut();
777
778        // Add correlation ID header
779        req_header
780            .insert_header("X-Correlation-Id", &ctx.trace_id)
781            .ok();
782        req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
783
784        // Use cached config (set in upstream_peer, or fetch now if needed)
785        let config = ctx
786            .config
787            .get_or_insert_with(|| self.config_manager.current());
788
789        // Enforce header limits (fast path: skip if limits are very high)
790        const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; // 1MB = effectively unlimited
791
792        // Header count check - O(1)
793        let header_count = req_header.headers.len();
794        if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
795            && header_count > config.limits.max_header_count
796        {
797            warn!(
798                correlation_id = %ctx.trace_id,
799                header_count = header_count,
800                limit = config.limits.max_header_count,
801                "Request blocked: exceeds header count limit"
802            );
803
804            self.metrics.record_blocked_request("header_count_exceeded");
805            return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
806        }
807
808        // Header size check - O(n), skip if limit is very high
809        if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
810            let total_header_size: usize = req_header
811                .headers
812                .iter()
813                .map(|(k, v)| k.as_str().len() + v.len())
814                .sum();
815
816            if total_header_size > config.limits.max_header_size_bytes {
817                warn!(
818                    correlation_id = %ctx.trace_id,
819                    header_size = total_header_size,
820                    limit = config.limits.max_header_size_bytes,
821                    "Request blocked: exceeds header size limit"
822                );
823
824                self.metrics.record_blocked_request("header_size_exceeded");
825                return Err(Error::explain(
826                    ErrorType::InternalError,
827                    "Headers too large",
828                ));
829            }
830        }
831
832        // Process through external agents
833        trace!(
834            correlation_id = %ctx.trace_id,
835            "Processing request through agents"
836        );
837        if let Err(e) = self
838            .process_agents(session, ctx, &client_addr, client_port)
839            .await
840        {
841            // Check if this is an HTTPStatus error (e.g., agent block or fail-closed)
842            // In that case, we need to send a proper HTTP response instead of just closing the connection
843            if let ErrorType::HTTPStatus(status) = e.etype() {
844                // Extract the message from the error (the context part after "HTTPStatus context:")
845                let error_msg = e.to_string();
846                let body = error_msg
847                    .split("context:")
848                    .nth(1)
849                    .map(|s| s.trim())
850                    .unwrap_or("Request blocked");
851                debug!(
852                    correlation_id = %ctx.trace_id,
853                    status = status,
854                    body = %body,
855                    "Sending HTTP error response for agent block"
856                );
857                crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
858                return Ok(true); // Request complete, don't continue to upstream
859            }
860            // For other errors, propagate them
861            return Err(e);
862        }
863
864        trace!(
865            correlation_id = %ctx.trace_id,
866            "Request filter phase complete, forwarding to upstream"
867        );
868
869        Ok(false) // Continue processing
870    }
871
872    /// Process incoming request body chunks.
873    /// Used for body size enforcement and WAF/agent inspection.
874    ///
875    /// Supports two modes:
876    /// - **Buffer mode** (default): Buffer chunks until end of stream or limit, then send to agents
877    /// - **Stream mode**: Send each chunk immediately to agents as it arrives
878    async fn request_body_filter(
879        &self,
880        _session: &mut Session,
881        body: &mut Option<Bytes>,
882        end_of_stream: bool,
883        ctx: &mut Self::CTX,
884    ) -> Result<(), Box<Error>> {
885        use sentinel_config::BodyStreamingMode;
886
887        // Handle WebSocket frame inspection (client -> server)
888        if ctx.is_websocket_upgrade {
889            if let Some(ref handler) = ctx.websocket_handler {
890                let result = handler.process_client_data(body.take()).await;
891                match result {
892                    crate::websocket::ProcessResult::Forward(data) => {
893                        *body = data;
894                    }
895                    crate::websocket::ProcessResult::Close(reason) => {
896                        warn!(
897                            correlation_id = %ctx.trace_id,
898                            code = reason.code,
899                            reason = %reason.reason,
900                            "WebSocket connection closed by agent (client->server)"
901                        );
902                        // Return an error to close the connection
903                        return Err(Error::explain(
904                            ErrorType::InternalError,
905                            format!("WebSocket closed: {} {}", reason.code, reason.reason),
906                        ));
907                    }
908                }
909            }
910            // Skip normal body processing for WebSocket
911            return Ok(());
912        }
913
914        // Track request body size
915        let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
916        if chunk_len > 0 {
917            ctx.request_body_bytes += chunk_len as u64;
918
919            trace!(
920                correlation_id = %ctx.trace_id,
921                chunk_size = chunk_len,
922                total_body_bytes = ctx.request_body_bytes,
923                end_of_stream = end_of_stream,
924                streaming_mode = ?ctx.request_body_streaming_mode,
925                "Processing request body chunk"
926            );
927
928            // Check body size limit (use cached config)
929            let config = ctx
930                .config
931                .get_or_insert_with(|| self.config_manager.current());
932            if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
933                warn!(
934                    correlation_id = %ctx.trace_id,
935                    body_bytes = ctx.request_body_bytes,
936                    limit = config.limits.max_body_size_bytes,
937                    "Request body size limit exceeded"
938                );
939                self.metrics.record_blocked_request("body_size_exceeded");
940                return Err(Error::explain(
941                    ErrorType::InternalError,
942                    "Request body too large",
943                ));
944            }
945        }
946
947        // Body inspection for agents (WAF, etc.)
948        if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
949            let config = ctx
950                .config
951                .get_or_insert_with(|| self.config_manager.current());
952            let max_inspection_bytes = config
953                .waf
954                .as_ref()
955                .map(|w| w.body_inspection.max_inspection_bytes as u64)
956                .unwrap_or(1024 * 1024);
957
958            match ctx.request_body_streaming_mode {
959                BodyStreamingMode::Stream => {
960                    // Stream mode: send each chunk immediately
961                    if body.is_some() {
962                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
963                            .await?;
964                    } else if end_of_stream && ctx.agent_needs_more {
965                        // Send final empty chunk to signal end
966                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
967                            .await?;
968                    }
969                }
970                BodyStreamingMode::Hybrid { buffer_threshold } => {
971                    // Hybrid mode: buffer up to threshold, then stream
972                    if ctx.body_bytes_inspected < buffer_threshold as u64 {
973                        // Still in buffering phase
974                        if let Some(ref chunk) = body {
975                            let bytes_to_buffer = std::cmp::min(
976                                chunk.len(),
977                                (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
978                            );
979                            ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
980                            ctx.body_bytes_inspected += bytes_to_buffer as u64;
981
982                            // If we've reached threshold or end of stream, switch to streaming
983                            if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
984                            {
985                                // Send buffered content first
986                                self.send_buffered_body_to_agents(
987                                    end_of_stream && chunk.len() == bytes_to_buffer,
988                                    ctx,
989                                )
990                                .await?;
991                                ctx.body_buffer.clear();
992
993                                // If there's remaining data in this chunk, stream it
994                                if bytes_to_buffer < chunk.len() {
995                                    let remaining = chunk.slice(bytes_to_buffer..);
996                                    let mut remaining_body = Some(remaining);
997                                    self.process_body_chunk_streaming(
998                                        &mut remaining_body,
999                                        end_of_stream,
1000                                        ctx,
1001                                    )
1002                                    .await?;
1003                                }
1004                            }
1005                        }
1006                    } else {
1007                        // Past threshold, stream directly
1008                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
1009                            .await?;
1010                    }
1011                }
1012                BodyStreamingMode::Buffer => {
1013                    // Buffer mode: collect chunks until ready to send
1014                    if let Some(ref chunk) = body {
1015                        if ctx.body_bytes_inspected < max_inspection_bytes {
1016                            let bytes_to_inspect = std::cmp::min(
1017                                chunk.len() as u64,
1018                                max_inspection_bytes - ctx.body_bytes_inspected,
1019                            ) as usize;
1020
1021                            ctx.body_buffer
1022                                .extend_from_slice(&chunk[..bytes_to_inspect]);
1023                            ctx.body_bytes_inspected += bytes_to_inspect as u64;
1024
1025                            trace!(
1026                                correlation_id = %ctx.trace_id,
1027                                bytes_inspected = ctx.body_bytes_inspected,
1028                                max_inspection_bytes = max_inspection_bytes,
1029                                buffer_size = ctx.body_buffer.len(),
1030                                "Buffering body for agent inspection"
1031                            );
1032                        }
1033                    }
1034
1035                    // Send when complete or limit reached
1036                    let should_send =
1037                        end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1038                    if should_send && !ctx.body_buffer.is_empty() {
1039                        self.send_buffered_body_to_agents(end_of_stream, ctx)
1040                            .await?;
1041                        ctx.body_buffer.clear();
1042                    }
1043                }
1044            }
1045        }
1046
1047        if end_of_stream {
1048            trace!(
1049                correlation_id = %ctx.trace_id,
1050                total_body_bytes = ctx.request_body_bytes,
1051                bytes_inspected = ctx.body_bytes_inspected,
1052                "Request body complete"
1053            );
1054        }
1055
1056        Ok(())
1057    }
1058
1059    async fn response_filter(
1060        &self,
1061        _session: &mut Session,
1062        upstream_response: &mut ResponseHeader,
1063        ctx: &mut Self::CTX,
1064    ) -> Result<(), Box<Error>> {
1065        let status = upstream_response.status.as_u16();
1066        let duration = ctx.elapsed();
1067
1068        trace!(
1069            correlation_id = %ctx.trace_id,
1070            status = status,
1071            "Starting response filter phase"
1072        );
1073
1074        // Handle WebSocket 101 Switching Protocols
1075        if status == 101 && ctx.is_websocket_upgrade {
1076            if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1077                // Create WebSocket inspector and handler with metrics
1078                let inspector = crate::websocket::WebSocketInspector::with_metrics(
1079                    self.agent_manager.clone(),
1080                    ctx.route_id
1081                        .clone()
1082                        .unwrap_or_else(|| "unknown".to_string()),
1083                    ctx.trace_id.clone(),
1084                    ctx.client_ip.clone(),
1085                    100, // 100ms timeout per frame inspection
1086                    Some(self.metrics.clone()),
1087                );
1088
1089                let handler = crate::websocket::WebSocketHandler::new(
1090                    std::sync::Arc::new(inspector),
1091                    1024 * 1024, // 1MB max frame size
1092                );
1093
1094                ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1095
1096                info!(
1097                    correlation_id = %ctx.trace_id,
1098                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1099                    agent_count = ctx.websocket_inspection_agents.len(),
1100                    "WebSocket upgrade successful, frame inspection enabled"
1101                );
1102            } else if ctx.websocket_skip_inspection {
1103                debug!(
1104                    correlation_id = %ctx.trace_id,
1105                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1106                    "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1107                );
1108            } else {
1109                debug!(
1110                    correlation_id = %ctx.trace_id,
1111                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1112                    "WebSocket upgrade successful"
1113                );
1114            }
1115        }
1116
1117        // Apply security headers
1118        trace!(
1119            correlation_id = %ctx.trace_id,
1120            "Applying security headers"
1121        );
1122        self.apply_security_headers(upstream_response).ok();
1123
1124        // Add correlation ID to response
1125        upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1126
1127        // Add rate limit headers if rate limiting was applied
1128        if let Some(ref rate_info) = ctx.rate_limit_info {
1129            upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1130            upstream_response
1131                .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1132            upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1133        }
1134
1135        // Add GeoIP country header if geo lookup was performed
1136        if let Some(ref country_code) = ctx.geo_country_code {
1137            upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1138        }
1139
1140        // Generate custom error pages for error responses
1141        if status >= 400 {
1142            trace!(
1143                correlation_id = %ctx.trace_id,
1144                status = status,
1145                "Handling error response"
1146            );
1147            self.handle_error_response(upstream_response, ctx).await?;
1148        }
1149
1150        // Record metrics
1151        self.metrics.record_request(
1152            ctx.route_id.as_deref().unwrap_or("unknown"),
1153            &ctx.method,
1154            status,
1155            duration,
1156        );
1157
1158        // Record passive health check
1159        if let Some(ref upstream) = ctx.upstream {
1160            let success = status < 500;
1161
1162            trace!(
1163                correlation_id = %ctx.trace_id,
1164                upstream = %upstream,
1165                success = success,
1166                status = status,
1167                "Recording passive health check result"
1168            );
1169
1170            let error_msg = if !success {
1171                Some(format!("HTTP {}", status))
1172            } else {
1173                None
1174            };
1175            self.passive_health
1176                .record_outcome(upstream, success, error_msg.as_deref())
1177                .await;
1178
1179            // Report to upstream pool
1180            if let Some(pool) = self.upstream_pools.get(upstream).await {
1181                pool.report_result(upstream, success).await;
1182            }
1183
1184            if !success {
1185                warn!(
1186                    correlation_id = %ctx.trace_id,
1187                    upstream = %upstream,
1188                    status = status,
1189                    "Upstream returned error status"
1190                );
1191            }
1192        }
1193
1194        // Final request completion log
1195        if status >= 500 {
1196            error!(
1197                correlation_id = %ctx.trace_id,
1198                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1199                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1200                method = %ctx.method,
1201                path = %ctx.path,
1202                status = status,
1203                duration_ms = duration.as_millis(),
1204                attempts = ctx.upstream_attempts,
1205                "Request completed with server error"
1206            );
1207        } else if status >= 400 {
1208            warn!(
1209                correlation_id = %ctx.trace_id,
1210                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1211                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1212                method = %ctx.method,
1213                path = %ctx.path,
1214                status = status,
1215                duration_ms = duration.as_millis(),
1216                "Request completed with client error"
1217            );
1218        } else {
1219            debug!(
1220                correlation_id = %ctx.trace_id,
1221                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1222                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1223                method = %ctx.method,
1224                path = %ctx.path,
1225                status = status,
1226                duration_ms = duration.as_millis(),
1227                attempts = ctx.upstream_attempts,
1228                "Request completed"
1229            );
1230        }
1231
1232        Ok(())
1233    }
1234
1235    /// Modify the request before sending to upstream.
1236    /// Used for header modifications, adding authentication, etc.
1237    async fn upstream_request_filter(
1238        &self,
1239        _session: &mut Session,
1240        upstream_request: &mut pingora::http::RequestHeader,
1241        ctx: &mut Self::CTX,
1242    ) -> Result<()>
1243    where
1244        Self::CTX: Send + Sync,
1245    {
1246        trace!(
1247            correlation_id = %ctx.trace_id,
1248            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1249            "Applying upstream request modifications"
1250        );
1251
1252        // Add trace ID header for upstream correlation
1253        upstream_request
1254            .insert_header("X-Trace-Id", &ctx.trace_id)
1255            .ok();
1256
1257        // Add request metadata headers
1258        upstream_request
1259            .insert_header("X-Forwarded-By", "Sentinel")
1260            .ok();
1261
1262        // Apply route-specific request header modifications
1263        // Clone the modifications to avoid lifetime issues with the header API
1264        if let Some(ref route_config) = ctx.route_config {
1265            let mods = route_config.policies.request_headers.clone();
1266
1267            // Set headers (overwrite existing)
1268            for (name, value) in mods.set {
1269                upstream_request.insert_header(name, value).ok();
1270            }
1271
1272            // Add headers (append)
1273            for (name, value) in mods.add {
1274                upstream_request.append_header(name, value).ok();
1275            }
1276
1277            // Remove headers
1278            for name in &mods.remove {
1279                upstream_request.remove_header(name);
1280            }
1281
1282            trace!(
1283                correlation_id = %ctx.trace_id,
1284                "Applied request header modifications"
1285            );
1286        }
1287
1288        // Remove sensitive headers that shouldn't go to upstream
1289        upstream_request.remove_header("X-Internal-Token");
1290        upstream_request.remove_header("Authorization-Internal");
1291
1292        Ok(())
1293    }
1294
1295    /// Process response body chunks from upstream.
1296    /// Used for response size tracking and WAF inspection.
1297    ///
1298    /// Note: Response body inspection is currently buffered only (streaming mode not supported
1299    /// for responses due to Pingora's synchronous filter design).
1300    fn response_body_filter(
1301        &self,
1302        _session: &mut Session,
1303        body: &mut Option<Bytes>,
1304        end_of_stream: bool,
1305        ctx: &mut Self::CTX,
1306    ) -> Result<Option<Duration>, Box<Error>> {
1307        // Handle WebSocket frame inspection (server -> client)
1308        // Note: This filter is synchronous, so we use block_in_place for async agent calls
1309        if ctx.is_websocket_upgrade {
1310            if let Some(ref handler) = ctx.websocket_handler {
1311                let handler = handler.clone();
1312                let data = body.take();
1313
1314                // Use block_in_place to run async handler from sync context
1315                // This is safe because Pingora uses a multi-threaded tokio runtime
1316                let result = tokio::task::block_in_place(|| {
1317                    tokio::runtime::Handle::current()
1318                        .block_on(async { handler.process_server_data(data).await })
1319                });
1320
1321                match result {
1322                    crate::websocket::ProcessResult::Forward(data) => {
1323                        *body = data;
1324                    }
1325                    crate::websocket::ProcessResult::Close(reason) => {
1326                        warn!(
1327                            correlation_id = %ctx.trace_id,
1328                            code = reason.code,
1329                            reason = %reason.reason,
1330                            "WebSocket connection closed by agent (server->client)"
1331                        );
1332                        // For sync filter, we can't return an error that closes the connection
1333                        // Instead, inject a close frame
1334                        let close_frame =
1335                            crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
1336                        let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
1337                        if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
1338                            *body = Some(Bytes::from(encoded));
1339                        }
1340                    }
1341                }
1342            }
1343            // Skip normal body processing for WebSocket
1344            return Ok(None);
1345        }
1346
1347        // Track response body size
1348        if let Some(ref chunk) = body {
1349            ctx.response_bytes += chunk.len() as u64;
1350
1351            trace!(
1352                correlation_id = %ctx.trace_id,
1353                chunk_size = chunk.len(),
1354                total_response_bytes = ctx.response_bytes,
1355                end_of_stream = end_of_stream,
1356                "Processing response body chunk"
1357            );
1358
1359            // Response body inspection (buffered mode only)
1360            // Note: Streaming mode for response bodies is not currently supported
1361            // due to Pingora's synchronous response_body_filter design
1362            if ctx.response_body_inspection_enabled
1363                && !ctx.response_body_inspection_agents.is_empty()
1364            {
1365                let config = ctx
1366                    .config
1367                    .get_or_insert_with(|| self.config_manager.current());
1368                let max_inspection_bytes = config
1369                    .waf
1370                    .as_ref()
1371                    .map(|w| w.body_inspection.max_inspection_bytes as u64)
1372                    .unwrap_or(1024 * 1024);
1373
1374                if ctx.response_body_bytes_inspected < max_inspection_bytes {
1375                    let bytes_to_inspect = std::cmp::min(
1376                        chunk.len() as u64,
1377                        max_inspection_bytes - ctx.response_body_bytes_inspected,
1378                    ) as usize;
1379
1380                    // Buffer for later processing (during logging phase)
1381                    // Response body inspection happens asynchronously and results
1382                    // are logged rather than blocking the response
1383                    ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
1384                    ctx.response_body_chunk_index += 1;
1385
1386                    trace!(
1387                        correlation_id = %ctx.trace_id,
1388                        bytes_inspected = ctx.response_body_bytes_inspected,
1389                        max_inspection_bytes = max_inspection_bytes,
1390                        chunk_index = ctx.response_body_chunk_index,
1391                        "Tracking response body for inspection"
1392                    );
1393                }
1394            }
1395        }
1396
1397        if end_of_stream {
1398            trace!(
1399                correlation_id = %ctx.trace_id,
1400                total_response_bytes = ctx.response_bytes,
1401                response_bytes_inspected = ctx.response_body_bytes_inspected,
1402                "Response body complete"
1403            );
1404        }
1405
1406        // Return None to indicate no delay needed
1407        Ok(None)
1408    }
1409
1410    /// Called when a connection to upstream is established or reused.
1411    /// Logs connection reuse statistics for observability.
1412    async fn connected_to_upstream(
1413        &self,
1414        _session: &mut Session,
1415        reused: bool,
1416        peer: &HttpPeer,
1417        #[cfg(unix)] _fd: RawFd,
1418        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
1419        digest: Option<&Digest>,
1420        ctx: &mut Self::CTX,
1421    ) -> Result<(), Box<Error>> {
1422        // Track connection reuse for metrics
1423        ctx.connection_reused = reused;
1424
1425        // Log connection establishment/reuse
1426        if reused {
1427            trace!(
1428                correlation_id = %ctx.trace_id,
1429                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1430                peer_address = %peer.address(),
1431                "Reusing existing upstream connection"
1432            );
1433        } else {
1434            debug!(
1435                correlation_id = %ctx.trace_id,
1436                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1437                peer_address = %peer.address(),
1438                ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
1439                "Established new upstream connection"
1440            );
1441        }
1442
1443        Ok(())
1444    }
1445
1446    // =========================================================================
1447    // HTTP Caching - Pingora Cache Integration
1448    // =========================================================================
1449
1450    /// Decide if the request should use caching.
1451    ///
1452    /// This method is called early in the request lifecycle to determine if
1453    /// the response should be served from cache or if the response should
1454    /// be cached.
1455    fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
1456        // Check if route has caching enabled
1457        let route_id = match ctx.route_id.as_deref() {
1458            Some(id) => id,
1459            None => {
1460                trace!(
1461                    correlation_id = %ctx.trace_id,
1462                    "Cache filter: no route ID, skipping cache"
1463                );
1464                return Ok(());
1465            }
1466        };
1467
1468        // Check if caching is enabled for this route
1469        if !self.cache_manager.is_enabled(route_id) {
1470            trace!(
1471                correlation_id = %ctx.trace_id,
1472                route_id = %route_id,
1473                "Cache disabled for route"
1474            );
1475            return Ok(());
1476        }
1477
1478        // Check if method is cacheable (typically GET/HEAD)
1479        if !self
1480            .cache_manager
1481            .is_method_cacheable(route_id, &ctx.method)
1482        {
1483            trace!(
1484                correlation_id = %ctx.trace_id,
1485                route_id = %route_id,
1486                method = %ctx.method,
1487                "Method not cacheable"
1488            );
1489            return Ok(());
1490        }
1491
1492        // Enable caching for this request using Pingora's cache infrastructure
1493        debug!(
1494            correlation_id = %ctx.trace_id,
1495            route_id = %route_id,
1496            method = %ctx.method,
1497            path = %ctx.path,
1498            "Enabling HTTP caching for request"
1499        );
1500
1501        // Get static references to cache infrastructure
1502        let storage = get_cache_storage();
1503        let eviction = get_cache_eviction();
1504        let cache_lock = get_cache_lock();
1505
1506        // Enable the cache with storage, eviction, and lock
1507        session.cache.enable(
1508            storage,
1509            Some(eviction),
1510            None, // predictor - optional
1511            Some(cache_lock),
1512            None, // option overrides
1513        );
1514
1515        // Mark request as cache-eligible in context
1516        ctx.cache_eligible = true;
1517
1518        trace!(
1519            correlation_id = %ctx.trace_id,
1520            route_id = %route_id,
1521            cache_enabled = session.cache.enabled(),
1522            "Cache enabled for request"
1523        );
1524
1525        Ok(())
1526    }
1527
1528    /// Generate the cache key for this request.
1529    ///
1530    /// The cache key uniquely identifies the cached response. It typically
1531    /// includes the method, host, path, and potentially query parameters.
1532    fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
1533        let req_header = session.req_header();
1534        let method = req_header.method.as_str();
1535        let path = req_header.uri.path();
1536        let host = ctx.host.as_deref().unwrap_or("unknown");
1537        let query = req_header.uri.query();
1538
1539        // Generate cache key using our cache manager
1540        let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1541
1542        trace!(
1543            correlation_id = %ctx.trace_id,
1544            cache_key = %key_string,
1545            "Generated cache key"
1546        );
1547
1548        // Use Pingora's default cache key generator which handles
1549        // proper hashing and internal format
1550        Ok(CacheKey::default(req_header))
1551    }
1552
1553    /// Called when a cache miss occurs.
1554    ///
1555    /// This is called when the cache lookup found no matching entry.
1556    /// We can use this to log and track cache misses.
1557    fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
1558        // Let Pingora handle the cache miss
1559        session.cache.cache_miss();
1560
1561        // Track statistics
1562        if let Some(route_id) = ctx.route_id.as_deref() {
1563            self.cache_manager.stats().record_miss();
1564
1565            trace!(
1566                correlation_id = %ctx.trace_id,
1567                route_id = %route_id,
1568                path = %ctx.path,
1569                "Cache miss"
1570            );
1571        }
1572    }
1573
1574    /// Called after a successful cache lookup.
1575    ///
1576    /// This filter allows inspecting the cached response before serving it.
1577    /// Returns `None` to serve the cached response, or a `ForcedInvalidationKind`
1578    /// to invalidate and refetch.
1579    async fn cache_hit_filter(
1580        &self,
1581        session: &mut Session,
1582        meta: &CacheMeta,
1583        _hit_handler: &mut HitHandler,
1584        is_fresh: bool,
1585        ctx: &mut Self::CTX,
1586    ) -> Result<Option<ForcedInvalidationKind>>
1587    where
1588        Self::CTX: Send + Sync,
1589    {
1590        // Check if this cache entry should be invalidated due to a purge request
1591        let req_header = session.req_header();
1592        let method = req_header.method.as_str();
1593        let path = req_header.uri.path();
1594        let host = req_header.uri.host().unwrap_or("localhost");
1595        let query = req_header.uri.query();
1596
1597        // Generate the cache key for this request
1598        let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1599
1600        // Check if this key should be invalidated
1601        if self.cache_manager.should_invalidate(&cache_key) {
1602            info!(
1603                correlation_id = %ctx.trace_id,
1604                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1605                cache_key = %cache_key,
1606                "Cache entry invalidated by purge request"
1607            );
1608            // Force expiration so the entry is refetched from upstream
1609            return Ok(Some(ForcedInvalidationKind::ForceExpired));
1610        }
1611
1612        // Track cache hit statistics
1613        if is_fresh {
1614            self.cache_manager.stats().record_hit();
1615
1616            debug!(
1617                correlation_id = %ctx.trace_id,
1618                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1619                is_fresh = is_fresh,
1620                "Cache hit (fresh)"
1621            );
1622        } else {
1623            trace!(
1624                correlation_id = %ctx.trace_id,
1625                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1626                is_fresh = is_fresh,
1627                "Cache hit (stale)"
1628            );
1629        }
1630
1631        // Serve the cached response without invalidation
1632        Ok(None)
1633    }
1634
1635    /// Decide if the response should be cached.
1636    ///
1637    /// Called after receiving the response from upstream to determine
1638    /// if it should be stored in the cache.
1639    fn response_cache_filter(
1640        &self,
1641        _session: &Session,
1642        resp: &ResponseHeader,
1643        ctx: &mut Self::CTX,
1644    ) -> Result<RespCacheable> {
1645        let route_id = match ctx.route_id.as_deref() {
1646            Some(id) => id,
1647            None => {
1648                return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1649                    "no_route",
1650                )));
1651            }
1652        };
1653
1654        // Check if caching is enabled for this route
1655        if !self.cache_manager.is_enabled(route_id) {
1656            return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1657                "disabled",
1658            )));
1659        }
1660
1661        let status = resp.status.as_u16();
1662
1663        // Check if status code is cacheable
1664        if !self.cache_manager.is_status_cacheable(route_id, status) {
1665            trace!(
1666                correlation_id = %ctx.trace_id,
1667                route_id = %route_id,
1668                status = status,
1669                "Status code not cacheable"
1670            );
1671            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1672        }
1673
1674        // Check Cache-Control header for no-store, no-cache, private
1675        if let Some(cache_control) = resp.headers.get("cache-control") {
1676            if let Ok(cc_str) = cache_control.to_str() {
1677                if crate::cache::CacheManager::is_no_cache(cc_str) {
1678                    trace!(
1679                        correlation_id = %ctx.trace_id,
1680                        route_id = %route_id,
1681                        cache_control = %cc_str,
1682                        "Response has no-cache directive"
1683                    );
1684                    return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1685                }
1686            }
1687        }
1688
1689        // Calculate TTL from Cache-Control or use default
1690        let cache_control = resp
1691            .headers
1692            .get("cache-control")
1693            .and_then(|v| v.to_str().ok());
1694        let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
1695
1696        if ttl.is_zero() {
1697            trace!(
1698                correlation_id = %ctx.trace_id,
1699                route_id = %route_id,
1700                "TTL is zero, not caching"
1701            );
1702            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1703        }
1704
1705        // Get route cache config for stale settings
1706        let config = self
1707            .cache_manager
1708            .get_route_config(route_id)
1709            .unwrap_or_default();
1710
1711        // Create timestamps for cache metadata
1712        let now = std::time::SystemTime::now();
1713        let fresh_until = now + ttl;
1714
1715        // Clone the response header for storage
1716        let header = resp.clone();
1717
1718        // Create CacheMeta with proper timestamps and TTLs
1719        let cache_meta = CacheMeta::new(
1720            fresh_until,
1721            now,
1722            config.stale_while_revalidate_secs as u32,
1723            config.stale_if_error_secs as u32,
1724            header,
1725        );
1726
1727        // Track the cache store
1728        self.cache_manager.stats().record_store();
1729
1730        debug!(
1731            correlation_id = %ctx.trace_id,
1732            route_id = %route_id,
1733            status = status,
1734            ttl_secs = ttl.as_secs(),
1735            stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1736            stale_if_error_secs = config.stale_if_error_secs,
1737            "Caching response"
1738        );
1739
1740        Ok(RespCacheable::Cacheable(cache_meta))
1741    }
1742
1743    /// Decide whether to serve stale content on error or during revalidation.
1744    ///
1745    /// This implements stale-while-revalidate and stale-if-error semantics.
1746    fn should_serve_stale(
1747        &self,
1748        _session: &mut Session,
1749        ctx: &mut Self::CTX,
1750        error: Option<&Error>,
1751    ) -> bool {
1752        let route_id = match ctx.route_id.as_deref() {
1753            Some(id) => id,
1754            None => return false,
1755        };
1756
1757        // Get route cache config for stale settings
1758        let config = match self.cache_manager.get_route_config(route_id) {
1759            Some(c) => c,
1760            None => return false,
1761        };
1762
1763        // If there's an upstream error, use stale-if-error
1764        if let Some(e) = error {
1765            // Only serve stale for upstream errors
1766            if e.esource() == &pingora::ErrorSource::Upstream {
1767                debug!(
1768                    correlation_id = %ctx.trace_id,
1769                    route_id = %route_id,
1770                    error = %e,
1771                    stale_if_error_secs = config.stale_if_error_secs,
1772                    "Considering stale-if-error"
1773                );
1774                return config.stale_if_error_secs > 0;
1775            }
1776        }
1777
1778        // During stale-while-revalidate (error is None)
1779        if error.is_none() && config.stale_while_revalidate_secs > 0 {
1780            trace!(
1781                correlation_id = %ctx.trace_id,
1782                route_id = %route_id,
1783                stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1784                "Allowing stale-while-revalidate"
1785            );
1786            return true;
1787        }
1788
1789        false
1790    }
1791
1792    /// Handle Range header for byte-range requests (streaming support).
1793    ///
1794    /// This method is called when a Range header is present in the request.
1795    /// It allows proper handling of:
1796    /// - Video streaming (HTML5 video seeking)
1797    /// - Large file downloads with resume support
1798    /// - Partial content delivery
1799    ///
1800    /// Uses Pingora's built-in range handling with route-specific logging.
1801    fn range_header_filter(
1802        &self,
1803        session: &mut Session,
1804        response: &mut ResponseHeader,
1805        ctx: &mut Self::CTX,
1806    ) -> pingora_proxy::RangeType
1807    where
1808        Self::CTX: Send + Sync,
1809    {
1810        // Check if route supports range requests
1811        let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
1812            // Static file routes and media routes should support range requests
1813            matches!(
1814                config.service_type,
1815                sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
1816            )
1817        });
1818
1819        if !supports_range {
1820            trace!(
1821                correlation_id = %ctx.trace_id,
1822                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1823                "Range request not supported for this route type"
1824            );
1825            return pingora_proxy::RangeType::None;
1826        }
1827
1828        // Use Pingora's built-in range header parsing and handling
1829        let range_type = pingora_proxy::range_header_filter(session.req_header(), response);
1830
1831        match &range_type {
1832            pingora_proxy::RangeType::None => {
1833                trace!(
1834                    correlation_id = %ctx.trace_id,
1835                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1836                    "No range request or not applicable"
1837                );
1838            }
1839            pingora_proxy::RangeType::Single(range) => {
1840                trace!(
1841                    correlation_id = %ctx.trace_id,
1842                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1843                    range_start = range.start,
1844                    range_end = range.end,
1845                    "Processing single-range request"
1846                );
1847            }
1848            pingora_proxy::RangeType::Multi(multi) => {
1849                trace!(
1850                    correlation_id = %ctx.trace_id,
1851                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1852                    range_count = multi.ranges.len(),
1853                    "Processing multi-range request"
1854                );
1855            }
1856            pingora_proxy::RangeType::Invalid => {
1857                debug!(
1858                    correlation_id = %ctx.trace_id,
1859                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1860                    "Invalid range header"
1861                );
1862            }
1863        }
1864
1865        range_type
1866    }
1867
1868    /// Handle fatal proxy errors by generating custom error pages.
1869    /// Called when the proxy itself fails to process the request.
1870    async fn fail_to_proxy(
1871        &self,
1872        session: &mut Session,
1873        e: &Error,
1874        ctx: &mut Self::CTX,
1875    ) -> pingora_proxy::FailToProxy
1876    where
1877        Self::CTX: Send + Sync,
1878    {
1879        let error_code = match e.etype() {
1880            // Connection errors
1881            ErrorType::ConnectRefused => 503,
1882            ErrorType::ConnectTimedout => 504,
1883            ErrorType::ConnectNoRoute => 502,
1884
1885            // Timeout errors
1886            ErrorType::ReadTimedout => 504,
1887            ErrorType::WriteTimedout => 504,
1888
1889            // TLS errors
1890            ErrorType::TLSHandshakeFailure => 502,
1891            ErrorType::InvalidCert => 502,
1892
1893            // Protocol errors
1894            ErrorType::InvalidHTTPHeader => 400,
1895            ErrorType::H2Error => 502,
1896
1897            // Resource errors
1898            ErrorType::ConnectProxyFailure => 502,
1899            ErrorType::ConnectionClosed => 502,
1900
1901            // Explicit HTTP status (e.g., from agent fail-closed blocking)
1902            ErrorType::HTTPStatus(status) => *status,
1903
1904            // Internal errors - return 502 for upstream issues (more accurate than 500)
1905            ErrorType::InternalError => {
1906                // Check if this is an upstream-related error
1907                let error_str = e.to_string();
1908                if error_str.contains("upstream")
1909                    || error_str.contains("DNS")
1910                    || error_str.contains("resolve")
1911                {
1912                    502
1913                } else {
1914                    500
1915                }
1916            }
1917
1918            // Default to 502 for unknown errors
1919            _ => 502,
1920        };
1921
1922        error!(
1923            correlation_id = %ctx.trace_id,
1924            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1925            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1926            error_type = ?e.etype(),
1927            error = %e,
1928            error_code = error_code,
1929            "Proxy error occurred"
1930        );
1931
1932        // Record the error in metrics
1933        self.metrics
1934            .record_blocked_request(&format!("proxy_error_{}", error_code));
1935
1936        // Write error response to ensure client receives a proper HTTP response
1937        // This is necessary because some errors occur before the upstream connection
1938        // is established, and Pingora may not send a response automatically
1939        let error_message = match error_code {
1940            400 => "Bad Request",
1941            502 => "Bad Gateway",
1942            503 => "Service Unavailable",
1943            504 => "Gateway Timeout",
1944            _ => "Internal Server Error",
1945        };
1946
1947        // Build a minimal error response body
1948        let body = format!(
1949            r#"{{"error":"{} {}","trace_id":"{}"}}"#,
1950            error_code, error_message, ctx.trace_id
1951        );
1952
1953        // Write the response header
1954        let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
1955        header
1956            .insert_header("Content-Type", "application/json")
1957            .ok();
1958        header
1959            .insert_header("Content-Length", body.len().to_string())
1960            .ok();
1961        header
1962            .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
1963            .ok();
1964        header.insert_header("Connection", "close").ok();
1965
1966        // Write headers and body
1967        if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
1968            warn!(
1969                correlation_id = %ctx.trace_id,
1970                error = %write_err,
1971                "Failed to write error response header"
1972            );
1973        } else {
1974            // Write the body
1975            if let Err(write_err) = session
1976                .write_response_body(Some(bytes::Bytes::from(body)), true)
1977                .await
1978            {
1979                warn!(
1980                    correlation_id = %ctx.trace_id,
1981                    error = %write_err,
1982                    "Failed to write error response body"
1983                );
1984            }
1985        }
1986
1987        // Return the error response info
1988        // can_reuse_downstream: false since we already wrote and closed the response
1989        pingora_proxy::FailToProxy {
1990            error_code,
1991            can_reuse_downstream: false,
1992        }
1993    }
1994
1995    /// Handle errors that occur during proxying after upstream connection is established.
1996    ///
1997    /// This method enables retry logic and circuit breaker integration.
1998    /// It's called when an error occurs during the request/response exchange
1999    /// with the upstream server.
2000    fn error_while_proxy(
2001        &self,
2002        peer: &HttpPeer,
2003        session: &mut Session,
2004        e: Box<Error>,
2005        ctx: &mut Self::CTX,
2006        client_reused: bool,
2007    ) -> Box<Error> {
2008        let error_type = e.etype().clone();
2009        let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
2010
2011        // Classify error for retry decisions
2012        let is_retryable = matches!(
2013            error_type,
2014            ErrorType::ConnectTimedout
2015                | ErrorType::ReadTimedout
2016                | ErrorType::WriteTimedout
2017                | ErrorType::ConnectionClosed
2018                | ErrorType::ConnectRefused
2019        );
2020
2021        // Log the error with context
2022        warn!(
2023            correlation_id = %ctx.trace_id,
2024            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2025            upstream = %upstream_id,
2026            peer_address = %peer.address(),
2027            error_type = ?error_type,
2028            error = %e,
2029            client_reused = client_reused,
2030            is_retryable = is_retryable,
2031            "Error during proxy operation"
2032        );
2033
2034        // Record failure with circuit breaker via upstream pool
2035        // This is done asynchronously since we can't await in a sync fn
2036        let peer_address = peer.address().to_string();
2037        let upstream_pools = self.upstream_pools.clone();
2038        let upstream_id_owned = upstream_id.to_string();
2039        tokio::spawn(async move {
2040            if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
2041                pool.report_result(&peer_address, false).await;
2042            }
2043        });
2044
2045        // Metrics tracking
2046        self.metrics
2047            .record_blocked_request(&format!("proxy_error_{:?}", error_type));
2048
2049        // Create enhanced error with retry information
2050        let mut enhanced_error = e.more_context(format!(
2051            "Upstream: {}, Peer: {}, Attempts: {}",
2052            upstream_id,
2053            peer.address(),
2054            ctx.upstream_attempts
2055        ));
2056
2057        // Determine if retry should be attempted:
2058        // - Only retry if error is retryable type
2059        // - Only retry reused connections if buffer isn't truncated
2060        // - Track retry metrics
2061        if is_retryable {
2062            let can_retry = if client_reused {
2063                // For reused connections, check if retry buffer is intact
2064                !session.as_ref().retry_buffer_truncated()
2065            } else {
2066                // Fresh connections can always retry
2067                true
2068            };
2069
2070            enhanced_error.retry.decide_reuse(can_retry);
2071
2072            if can_retry {
2073                debug!(
2074                    correlation_id = %ctx.trace_id,
2075                    upstream = %upstream_id,
2076                    error_type = ?error_type,
2077                    "Error is retryable, will attempt retry"
2078                );
2079            }
2080        } else {
2081            // Non-retryable error - don't retry
2082            enhanced_error.retry.decide_reuse(false);
2083        }
2084
2085        enhanced_error
2086    }
2087
2088    async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
2089        // Decrement active requests
2090        self.reload_coordinator.dec_requests();
2091
2092        let duration = ctx.elapsed();
2093
2094        // Get response status
2095        let status = session
2096            .response_written()
2097            .map(|r| r.status.as_u16())
2098            .unwrap_or(0);
2099
2100        // Write to access log file if configured
2101        if self.log_manager.access_log_enabled() {
2102            let access_entry = AccessLogEntry {
2103                timestamp: chrono::Utc::now().to_rfc3339(),
2104                trace_id: ctx.trace_id.clone(),
2105                method: ctx.method.clone(),
2106                path: ctx.path.clone(),
2107                query: ctx.query.clone(),
2108                protocol: "HTTP/1.1".to_string(),
2109                status,
2110                body_bytes: ctx.response_bytes,
2111                duration_ms: duration.as_millis() as u64,
2112                client_ip: ctx.client_ip.clone(),
2113                user_agent: ctx.user_agent.clone(),
2114                referer: ctx.referer.clone(),
2115                host: ctx.host.clone(),
2116                route_id: ctx.route_id.clone(),
2117                upstream: ctx.upstream.clone(),
2118                upstream_attempts: ctx.upstream_attempts,
2119                instance_id: self.app_state.instance_id.clone(),
2120            };
2121            self.log_manager.log_access(&access_entry);
2122        }
2123
2124        // Log to tracing at debug level (avoid allocations if debug disabled)
2125        if tracing::enabled!(tracing::Level::DEBUG) {
2126            debug!(
2127                trace_id = %ctx.trace_id,
2128                method = %ctx.method,
2129                path = %ctx.path,
2130                route_id = ?ctx.route_id,
2131                upstream = ?ctx.upstream,
2132                status = status,
2133                duration_ms = duration.as_millis() as u64,
2134                upstream_attempts = ctx.upstream_attempts,
2135                error = ?_error.map(|e| e.to_string()),
2136                "Request completed"
2137            );
2138        }
2139
2140        // Log WebSocket upgrades at info level
2141        if ctx.is_websocket_upgrade && status == 101 {
2142            info!(
2143                trace_id = %ctx.trace_id,
2144                route_id = ?ctx.route_id,
2145                upstream = ?ctx.upstream,
2146                client_ip = %ctx.client_ip,
2147                "WebSocket connection established"
2148            );
2149        }
2150    }
2151}
2152
2153// =============================================================================
2154// Helper methods for body streaming (not part of ProxyHttp trait)
2155// =============================================================================
2156
2157impl SentinelProxy {
2158    /// Process a single body chunk in streaming mode.
2159    async fn process_body_chunk_streaming(
2160        &self,
2161        body: &mut Option<Bytes>,
2162        end_of_stream: bool,
2163        ctx: &mut RequestContext,
2164    ) -> Result<(), Box<Error>> {
2165        // Clone the chunk data to avoid borrowing issues when mutating body later
2166        let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
2167        let chunk_index = ctx.request_body_chunk_index;
2168        ctx.request_body_chunk_index += 1;
2169        ctx.body_bytes_inspected += chunk_data.len() as u64;
2170
2171        debug!(
2172            correlation_id = %ctx.trace_id,
2173            chunk_index = chunk_index,
2174            chunk_size = chunk_data.len(),
2175            end_of_stream = end_of_stream,
2176            "Streaming body chunk to agents"
2177        );
2178
2179        // Create agent call context
2180        let agent_ctx = crate::agents::AgentCallContext {
2181            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
2182            metadata: sentinel_agent_protocol::RequestMetadata {
2183                correlation_id: ctx.trace_id.clone(),
2184                request_id: ctx.trace_id.clone(),
2185                client_ip: ctx.client_ip.clone(),
2186                client_port: 0,
2187                server_name: ctx.host.clone(),
2188                protocol: "HTTP/1.1".to_string(),
2189                tls_version: None,
2190                tls_cipher: None,
2191                route_id: ctx.route_id.clone(),
2192                upstream_id: ctx.upstream.clone(),
2193                timestamp: chrono::Utc::now().to_rfc3339(),
2194            },
2195            route_id: ctx.route_id.clone(),
2196            upstream_id: ctx.upstream.clone(),
2197            request_body: None, // Not used in streaming mode
2198            response_body: None,
2199        };
2200
2201        let agent_ids = ctx.body_inspection_agents.clone();
2202        let total_size = None; // Unknown in streaming mode
2203
2204        match self
2205            .agent_manager
2206            .process_request_body_streaming(
2207                &agent_ctx,
2208                &chunk_data,
2209                end_of_stream,
2210                chunk_index,
2211                ctx.body_bytes_inspected as usize,
2212                total_size,
2213                &agent_ids,
2214            )
2215            .await
2216        {
2217            Ok(decision) => {
2218                // Track if agent needs more data
2219                ctx.agent_needs_more = decision.needs_more;
2220
2221                // Apply body mutation if present
2222                if let Some(ref mutation) = decision.request_body_mutation {
2223                    if !mutation.is_pass_through() {
2224                        if mutation.is_drop() {
2225                            // Drop the chunk
2226                            *body = None;
2227                            trace!(
2228                                correlation_id = %ctx.trace_id,
2229                                chunk_index = chunk_index,
2230                                "Agent dropped body chunk"
2231                            );
2232                        } else if let Some(ref new_data) = mutation.data {
2233                            // Replace chunk with mutated content
2234                            *body = Some(Bytes::from(new_data.clone()));
2235                            trace!(
2236                                correlation_id = %ctx.trace_id,
2237                                chunk_index = chunk_index,
2238                                original_size = chunk_data.len(),
2239                                new_size = new_data.len(),
2240                                "Agent mutated body chunk"
2241                            );
2242                        }
2243                    }
2244                }
2245
2246                // Check decision (only final if needs_more is false)
2247                if !decision.needs_more && !decision.is_allow() {
2248                    warn!(
2249                        correlation_id = %ctx.trace_id,
2250                        action = ?decision.action,
2251                        "Agent blocked request body"
2252                    );
2253                    self.metrics.record_blocked_request("agent_body_inspection");
2254
2255                    let (status, message) = match &decision.action {
2256                        crate::agents::AgentAction::Block { status, body, .. } => (
2257                            *status,
2258                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
2259                        ),
2260                        _ => (403, "Forbidden".to_string()),
2261                    };
2262
2263                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2264                }
2265
2266                trace!(
2267                    correlation_id = %ctx.trace_id,
2268                    needs_more = decision.needs_more,
2269                    "Agent processed body chunk"
2270                );
2271            }
2272            Err(e) => {
2273                let fail_closed = ctx
2274                    .route_config
2275                    .as_ref()
2276                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2277                    .unwrap_or(false);
2278
2279                if fail_closed {
2280                    error!(
2281                        correlation_id = %ctx.trace_id,
2282                        error = %e,
2283                        "Agent streaming body inspection failed, blocking (fail-closed)"
2284                    );
2285                    return Err(Error::explain(
2286                        ErrorType::HTTPStatus(503),
2287                        "Service unavailable",
2288                    ));
2289                } else {
2290                    warn!(
2291                        correlation_id = %ctx.trace_id,
2292                        error = %e,
2293                        "Agent streaming body inspection failed, allowing (fail-open)"
2294                    );
2295                }
2296            }
2297        }
2298
2299        Ok(())
2300    }
2301
2302    /// Send buffered body to agents (buffer mode).
2303    async fn send_buffered_body_to_agents(
2304        &self,
2305        end_of_stream: bool,
2306        ctx: &mut RequestContext,
2307    ) -> Result<(), Box<Error>> {
2308        debug!(
2309            correlation_id = %ctx.trace_id,
2310            buffer_size = ctx.body_buffer.len(),
2311            end_of_stream = end_of_stream,
2312            agent_count = ctx.body_inspection_agents.len(),
2313            decompression_enabled = ctx.decompression_enabled,
2314            "Sending buffered body to agents for inspection"
2315        );
2316
2317        // Decompress body if enabled and we have a supported encoding
2318        let body_for_inspection = if ctx.decompression_enabled {
2319            if let Some(ref encoding) = ctx.body_content_encoding {
2320                let config = crate::decompression::DecompressionConfig {
2321                    max_ratio: ctx.max_decompression_ratio,
2322                    max_output_bytes: ctx.max_decompression_bytes,
2323                };
2324
2325                match crate::decompression::decompress_body(
2326                    &ctx.body_buffer,
2327                    encoding,
2328                    &config,
2329                ) {
2330                    Ok(result) => {
2331                        ctx.body_was_decompressed = true;
2332                        self.metrics
2333                            .record_decompression_success(encoding, result.ratio);
2334                        debug!(
2335                            correlation_id = %ctx.trace_id,
2336                            encoding = %encoding,
2337                            compressed_size = result.compressed_size,
2338                            decompressed_size = result.decompressed_size,
2339                            ratio = result.ratio,
2340                            "Body decompressed for agent inspection"
2341                        );
2342                        result.data
2343                    }
2344                    Err(e) => {
2345                        // Record failure metric
2346                        let failure_reason = match &e {
2347                            crate::decompression::DecompressionError::RatioExceeded { .. } => {
2348                                "ratio_exceeded"
2349                            }
2350                            crate::decompression::DecompressionError::SizeExceeded { .. } => {
2351                                "size_exceeded"
2352                            }
2353                            crate::decompression::DecompressionError::InvalidData { .. } => {
2354                                "invalid_data"
2355                            }
2356                            crate::decompression::DecompressionError::UnsupportedEncoding { .. } => {
2357                                "unsupported"
2358                            }
2359                            crate::decompression::DecompressionError::IoError(_) => "io_error",
2360                        };
2361                        self.metrics
2362                            .record_decompression_failure(encoding, failure_reason);
2363
2364                        // Decompression failed - decide based on failure mode
2365                        let fail_closed = ctx
2366                            .route_config
2367                            .as_ref()
2368                            .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2369                            .unwrap_or(false);
2370
2371                        if fail_closed {
2372                            error!(
2373                                correlation_id = %ctx.trace_id,
2374                                error = %e,
2375                                encoding = %encoding,
2376                                "Decompression failed, blocking (fail-closed)"
2377                            );
2378                            return Err(Error::explain(
2379                                ErrorType::HTTPStatus(400),
2380                                "Invalid compressed body",
2381                            ));
2382                        } else {
2383                            warn!(
2384                                correlation_id = %ctx.trace_id,
2385                                error = %e,
2386                                encoding = %encoding,
2387                                "Decompression failed, sending compressed body (fail-open)"
2388                            );
2389                            ctx.body_buffer.clone()
2390                        }
2391                    }
2392                }
2393            } else {
2394                ctx.body_buffer.clone()
2395            }
2396        } else {
2397            ctx.body_buffer.clone()
2398        };
2399
2400        let agent_ctx = crate::agents::AgentCallContext {
2401            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
2402            metadata: sentinel_agent_protocol::RequestMetadata {
2403                correlation_id: ctx.trace_id.clone(),
2404                request_id: ctx.trace_id.clone(),
2405                client_ip: ctx.client_ip.clone(),
2406                client_port: 0,
2407                server_name: ctx.host.clone(),
2408                protocol: "HTTP/1.1".to_string(),
2409                tls_version: None,
2410                tls_cipher: None,
2411                route_id: ctx.route_id.clone(),
2412                upstream_id: ctx.upstream.clone(),
2413                timestamp: chrono::Utc::now().to_rfc3339(),
2414            },
2415            route_id: ctx.route_id.clone(),
2416            upstream_id: ctx.upstream.clone(),
2417            request_body: Some(body_for_inspection.clone()),
2418            response_body: None,
2419        };
2420
2421        let agent_ids = ctx.body_inspection_agents.clone();
2422        match self
2423            .agent_manager
2424            .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
2425            .await
2426        {
2427            Ok(decision) => {
2428                if !decision.is_allow() {
2429                    warn!(
2430                        correlation_id = %ctx.trace_id,
2431                        action = ?decision.action,
2432                        "Agent blocked request body"
2433                    );
2434                    self.metrics.record_blocked_request("agent_body_inspection");
2435
2436                    let (status, message) = match &decision.action {
2437                        crate::agents::AgentAction::Block { status, body, .. } => (
2438                            *status,
2439                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
2440                        ),
2441                        _ => (403, "Forbidden".to_string()),
2442                    };
2443
2444                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2445                }
2446
2447                trace!(
2448                    correlation_id = %ctx.trace_id,
2449                    "Agent allowed request body"
2450                );
2451            }
2452            Err(e) => {
2453                let fail_closed = ctx
2454                    .route_config
2455                    .as_ref()
2456                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2457                    .unwrap_or(false);
2458
2459                if fail_closed {
2460                    error!(
2461                        correlation_id = %ctx.trace_id,
2462                        error = %e,
2463                        "Agent body inspection failed, blocking (fail-closed)"
2464                    );
2465                    return Err(Error::explain(
2466                        ErrorType::HTTPStatus(503),
2467                        "Service unavailable",
2468                    ));
2469                } else {
2470                    warn!(
2471                        correlation_id = %ctx.trace_id,
2472                        error = %e,
2473                        "Agent body inspection failed, allowing (fail-open)"
2474                    );
2475                }
2476            }
2477        }
2478
2479        Ok(())
2480    }
2481}