sentinel_proxy/proxy/
http_trait.rs

1//! ProxyHttp trait implementation for SentinelProxy.
2//!
3//! This module contains the Pingora ProxyHttp trait implementation which defines
4//! the core request/response lifecycle handling.
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14    CacheKey, CacheMeta, ForcedInvalidationKind, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
23use crate::rate_limit::HeaderAccessor;
24use crate::routing::RequestInfo;
25
26use super::context::RequestContext;
27use super::SentinelProxy;
28
29/// Helper type for rate limiting when we don't need header access
30struct NoHeaderAccessor;
31impl HeaderAccessor for NoHeaderAccessor {
32    fn get_header(&self, _name: &str) -> Option<String> {
33        None
34    }
35}
36
37#[async_trait]
38impl ProxyHttp for SentinelProxy {
39    type CTX = RequestContext;
40
41    fn new_ctx(&self) -> Self::CTX {
42        RequestContext::new()
43    }
44
45    fn fail_to_connect(
46        &self,
47        _session: &mut Session,
48        peer: &HttpPeer,
49        ctx: &mut Self::CTX,
50        e: Box<Error>,
51    ) -> Box<Error> {
52        error!(
53            correlation_id = %ctx.trace_id,
54            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
55            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
56            peer_address = %peer.address(),
57            error = %e,
58            "Failed to connect to upstream peer"
59        );
60        // Custom error pages are handled in response_filter
61        e
62    }
63
64    /// Early request filter - runs before upstream selection
65    /// Used to handle builtin routes that don't need an upstream connection
66    async fn early_request_filter(
67        &self,
68        session: &mut Session,
69        ctx: &mut Self::CTX,
70    ) -> Result<(), Box<Error>> {
71        // Extract request info for routing
72        let req_header = session.req_header();
73        let method = req_header.method.as_str();
74        let path = req_header.uri.path();
75        let host = req_header
76            .headers
77            .get("host")
78            .and_then(|h| h.to_str().ok())
79            .unwrap_or("");
80
81        ctx.method = method.to_string();
82        ctx.path = path.to_string();
83        ctx.host = Some(host.to_string());
84
85        // Match route to determine service type
86        let route_match = {
87            let route_matcher = self.route_matcher.read();
88            let request_info = RequestInfo::new(method, path, host);
89            match route_matcher.match_request(&request_info) {
90                Some(m) => m,
91                None => return Ok(()), // No matching route, let upstream_peer handle it
92            }
93        };
94
95        ctx.trace_id = self.get_trace_id(session);
96        ctx.route_id = Some(route_match.route_id.to_string());
97        ctx.route_config = Some(route_match.config.clone());
98
99        // Check if this is a builtin handler route
100        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
101            trace!(
102                correlation_id = %ctx.trace_id,
103                route_id = %route_match.route_id,
104                builtin_handler = ?route_match.config.builtin_handler,
105                "Handling builtin route in early_request_filter"
106            );
107
108            // Handle the builtin route directly
109            let handled = self
110                .handle_builtin_route(session, ctx, &route_match)
111                .await?;
112
113            if handled {
114                // Return error to signal that request is complete (Pingora will not continue)
115                return Err(Error::explain(
116                    ErrorType::InternalError,
117                    "Builtin handler complete",
118                ));
119            }
120        }
121
122        Ok(())
123    }
124
125    async fn upstream_peer(
126        &self,
127        session: &mut Session,
128        ctx: &mut Self::CTX,
129    ) -> Result<Box<HttpPeer>, Box<Error>> {
130        // Track active request
131        self.reload_coordinator.inc_requests();
132
133        // Cache global config once per request (avoids repeated Arc clones)
134        if ctx.config.is_none() {
135            ctx.config = Some(self.config_manager.current());
136        }
137
138        // Cache client address for logging if not already set
139        if ctx.client_ip.is_empty() {
140            ctx.client_ip = session
141                .client_addr()
142                .map(|a| a.to_string())
143                .unwrap_or_else(|| "unknown".to_string());
144        }
145
146        let req_header = session.req_header();
147
148        // Cache request info for access logging if not already set
149        if ctx.method.is_empty() {
150            ctx.method = req_header.method.to_string();
151            ctx.path = req_header.uri.path().to_string();
152            ctx.query = req_header.uri.query().map(|q| q.to_string());
153            ctx.host = req_header
154                .headers
155                .get("host")
156                .and_then(|v| v.to_str().ok())
157                .map(|s| s.to_string());
158        }
159        ctx.user_agent = req_header
160            .headers
161            .get("user-agent")
162            .and_then(|v| v.to_str().ok())
163            .map(|s| s.to_string());
164        ctx.referer = req_header
165            .headers
166            .get("referer")
167            .and_then(|v| v.to_str().ok())
168            .map(|s| s.to_string());
169
170        trace!(
171            correlation_id = %ctx.trace_id,
172            client_ip = %ctx.client_ip,
173            "Request received, initializing context"
174        );
175
176        // Use cached route info if already set by early_request_filter
177        let route_match = if let Some(ref route_config) = ctx.route_config {
178            let route_id = ctx.route_id.as_deref().unwrap_or("");
179            crate::routing::RouteMatch {
180                route_id: sentinel_common::RouteId::new(route_id),
181                config: route_config.clone(),
182            }
183        } else {
184            // Match route using sync RwLock (scoped to ensure lock is released before async ops)
185            let (match_result, route_duration) = {
186                let route_matcher = self.route_matcher.read();
187                let host = ctx.host.as_deref().unwrap_or("");
188
189                // Build request info (zero-copy for common case)
190                let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
191
192                // Only build headers HashMap if any route needs header matching
193                if route_matcher.needs_headers() {
194                    request_info = request_info
195                        .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
196                }
197
198                // Only parse query params if any route needs query param matching
199                if route_matcher.needs_query_params() {
200                    request_info =
201                        request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
202                }
203
204                trace!(
205                    correlation_id = %ctx.trace_id,
206                    method = %request_info.method,
207                    path = %request_info.path,
208                    host = %request_info.host,
209                    "Built request info for route matching"
210                );
211
212                let route_start = std::time::Instant::now();
213                let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
214                    warn!(
215                        correlation_id = %ctx.trace_id,
216                        method = %request_info.method,
217                        path = %request_info.path,
218                        host = %request_info.host,
219                        "No matching route found for request"
220                    );
221                    Error::explain(ErrorType::InternalError, "No matching route found")
222                })?;
223                let route_duration = route_start.elapsed();
224                // Lock is dropped here when block ends
225                (route_match, route_duration)
226            };
227
228            ctx.route_id = Some(match_result.route_id.to_string());
229            ctx.route_config = Some(match_result.config.clone());
230
231            trace!(
232                correlation_id = %ctx.trace_id,
233                route_id = %match_result.route_id,
234                route_duration_us = route_duration.as_micros(),
235                service_type = ?match_result.config.service_type,
236                "Route matched"
237            );
238            match_result
239        };
240
241        // Check if this is a builtin handler route (no upstream needed)
242        if route_match.config.service_type == sentinel_config::ServiceType::Builtin {
243            trace!(
244                correlation_id = %ctx.trace_id,
245                route_id = %route_match.route_id,
246                builtin_handler = ?route_match.config.builtin_handler,
247                "Route type is builtin, skipping upstream"
248            );
249            // Mark as builtin route for later processing in request_filter
250            ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
251            // Return error to skip upstream connection for builtin routes
252            return Err(Error::explain(
253                ErrorType::InternalError,
254                "Builtin handler handled in request_filter",
255            ));
256        }
257
258        // Check if this is a static file route
259        if route_match.config.service_type == sentinel_config::ServiceType::Static {
260            trace!(
261                correlation_id = %ctx.trace_id,
262                route_id = %route_match.route_id,
263                "Route type is static, checking for static server"
264            );
265            // Static routes don't need an upstream
266            if self
267                .static_servers
268                .get(route_match.route_id.as_str())
269                .await
270                .is_some()
271            {
272                // Mark this as a static route for later processing
273                ctx.upstream = Some(format!("_static_{}", route_match.route_id));
274                info!(
275                    correlation_id = %ctx.trace_id,
276                    route_id = %route_match.route_id,
277                    path = %ctx.path,
278                    "Serving static file"
279                );
280                // Return error to avoid upstream connection for static routes
281                return Err(Error::explain(
282                    ErrorType::InternalError,
283                    "Static file serving handled in request_filter",
284                ));
285            }
286        }
287
288        // Regular route with upstream
289        if let Some(ref upstream) = route_match.config.upstream {
290            ctx.upstream = Some(upstream.clone());
291            trace!(
292                correlation_id = %ctx.trace_id,
293                route_id = %route_match.route_id,
294                upstream = %upstream,
295                "Upstream configured for route"
296            );
297        } else {
298            error!(
299                correlation_id = %ctx.trace_id,
300                route_id = %route_match.route_id,
301                "Route has no upstream configured"
302            );
303            return Err(Error::explain(
304                ErrorType::InternalError,
305                format!(
306                    "Route '{}' has no upstream configured",
307                    route_match.route_id
308                ),
309            ));
310        }
311
312        debug!(
313            correlation_id = %ctx.trace_id,
314            route_id = %route_match.route_id,
315            upstream = ?ctx.upstream,
316            method = %req_header.method,
317            path = %req_header.uri.path(),
318            host = ctx.host.as_deref().unwrap_or("-"),
319            client_ip = %ctx.client_ip,
320            "Processing request"
321        );
322
323        // Get upstream pool (skip for static routes)
324        if ctx
325            .upstream
326            .as_ref()
327            .is_some_and(|u| u.starts_with("_static_"))
328        {
329            // Static routes are handled in request_filter, should not reach here
330            return Err(Error::explain(
331                ErrorType::InternalError,
332                "Static route should be handled in request_filter",
333            ));
334        }
335
336        let upstream_name = ctx
337            .upstream
338            .as_ref()
339            .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
340
341        trace!(
342            correlation_id = %ctx.trace_id,
343            upstream = %upstream_name,
344            "Looking up upstream pool"
345        );
346
347        let pool = self
348            .upstream_pools
349            .get(upstream_name)
350            .await
351            .ok_or_else(|| {
352                error!(
353                    correlation_id = %ctx.trace_id,
354                    upstream = %upstream_name,
355                    "Upstream pool not found"
356                );
357                Error::explain(
358                    ErrorType::InternalError,
359                    format!("Upstream pool '{}' not found", upstream_name),
360                )
361            })?;
362
363        // Select peer from pool with retries
364        let max_retries = route_match
365            .config
366            .retry_policy
367            .as_ref()
368            .map(|r| r.max_attempts)
369            .unwrap_or(1);
370
371        trace!(
372            correlation_id = %ctx.trace_id,
373            upstream = %upstream_name,
374            max_retries = max_retries,
375            "Starting upstream peer selection"
376        );
377
378        let mut last_error = None;
379        let selection_start = std::time::Instant::now();
380
381        for attempt in 1..=max_retries {
382            ctx.upstream_attempts = attempt;
383
384            trace!(
385                correlation_id = %ctx.trace_id,
386                upstream = %upstream_name,
387                attempt = attempt,
388                max_retries = max_retries,
389                "Attempting to select upstream peer"
390            );
391
392            match pool.select_peer(None).await {
393                Ok(peer) => {
394                    let selection_duration = selection_start.elapsed();
395                    debug!(
396                        correlation_id = %ctx.trace_id,
397                        upstream = %upstream_name,
398                        peer_address = %peer.address(),
399                        attempt = attempt,
400                        selection_duration_us = selection_duration.as_micros(),
401                        "Selected upstream peer"
402                    );
403                    return Ok(Box::new(peer));
404                }
405                Err(e) => {
406                    warn!(
407                        correlation_id = %ctx.trace_id,
408                        upstream = %upstream_name,
409                        attempt = attempt,
410                        max_retries = max_retries,
411                        error = %e,
412                        "Failed to select upstream peer"
413                    );
414                    last_error = Some(e);
415
416                    if attempt < max_retries {
417                        // Exponential backoff (using pingora-timeout for efficiency)
418                        let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
419                        trace!(
420                            correlation_id = %ctx.trace_id,
421                            backoff_ms = backoff.as_millis(),
422                            "Backing off before retry"
423                        );
424                        sleep(backoff).await;
425                    }
426                }
427            }
428        }
429
430        let selection_duration = selection_start.elapsed();
431        error!(
432            correlation_id = %ctx.trace_id,
433            upstream = %upstream_name,
434            attempts = max_retries,
435            selection_duration_ms = selection_duration.as_millis(),
436            last_error = ?last_error,
437            "All upstream selection attempts failed"
438        );
439
440        Err(Error::explain(
441            ErrorType::InternalError,
442            format!("All upstream attempts failed: {:?}", last_error),
443        ))
444    }
445
446    async fn request_filter(
447        &self,
448        session: &mut Session,
449        ctx: &mut Self::CTX,
450    ) -> Result<bool, Box<Error>> {
451        trace!(
452            correlation_id = %ctx.trace_id,
453            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
454            "Starting request filter phase"
455        );
456
457        // Check rate limiting early (before other processing)
458        // Fast path: skip if no rate limiting is configured for this route
459        if let Some(route_id) = ctx.route_id.as_deref() {
460            if self.rate_limit_manager.has_route_limiter(route_id) {
461                let rate_result = self.rate_limit_manager.check(
462                    route_id,
463                    &ctx.client_ip,
464                    &ctx.path,
465                    Option::<&NoHeaderAccessor>::None,
466                );
467
468                if !rate_result.allowed {
469                    use sentinel_config::RateLimitAction;
470
471                    match rate_result.action {
472                        RateLimitAction::Reject => {
473                            warn!(
474                                correlation_id = %ctx.trace_id,
475                                route_id = route_id,
476                                client_ip = %ctx.client_ip,
477                                limiter = %rate_result.limiter,
478                                "Request rate limited"
479                            );
480                            self.metrics.record_blocked_request("rate_limited");
481
482                            // Audit log the rate limit
483                            let audit_entry = AuditLogEntry::rate_limited(
484                                &ctx.trace_id,
485                                &ctx.method,
486                                &ctx.path,
487                                &ctx.client_ip,
488                                &rate_result.limiter,
489                            )
490                            .with_route_id(route_id)
491                            .with_status_code(rate_result.status_code);
492                            self.log_manager.log_audit(&audit_entry);
493
494                            // Send rate limit response
495                            let body = rate_result
496                                .message
497                                .unwrap_or_else(|| "Rate limit exceeded".to_string());
498                            crate::http_helpers::write_error(
499                                session,
500                                rate_result.status_code,
501                                &body,
502                                "text/plain",
503                            )
504                            .await?;
505                            return Ok(true); // Request complete, don't continue
506                        }
507                        RateLimitAction::LogOnly => {
508                            debug!(
509                                correlation_id = %ctx.trace_id,
510                                route_id = route_id,
511                                "Rate limit exceeded (log only mode)"
512                            );
513                            // Continue processing
514                        }
515                        RateLimitAction::Delay => {
516                            // Delay handling could be implemented here with pingora_timeout::sleep
517                            debug!(
518                                correlation_id = %ctx.trace_id,
519                                route_id = route_id,
520                                "Rate limit delay mode not yet implemented, allowing request"
521                            );
522                        }
523                    }
524                }
525            }
526        }
527
528        // Check for WebSocket upgrade requests
529        let is_websocket_upgrade = session
530            .req_header()
531            .headers
532            .get(http::header::UPGRADE)
533            .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
534            .unwrap_or(false);
535
536        if is_websocket_upgrade {
537            ctx.is_websocket_upgrade = true;
538
539            // Check if route allows WebSocket upgrades
540            if let Some(ref route_config) = ctx.route_config {
541                if !route_config.websocket {
542                    warn!(
543                        correlation_id = %ctx.trace_id,
544                        route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
545                        client_ip = %ctx.client_ip,
546                        "WebSocket upgrade rejected: not enabled for route"
547                    );
548
549                    self.metrics.record_blocked_request("websocket_not_enabled");
550
551                    // Audit log the rejection
552                    let audit_entry = AuditLogEntry::new(
553                        &ctx.trace_id,
554                        AuditEventType::Blocked,
555                        &ctx.method,
556                        &ctx.path,
557                        &ctx.client_ip,
558                    )
559                    .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
560                    .with_action("websocket_rejected")
561                    .with_reason("WebSocket not enabled for route");
562                    self.log_manager.log_audit(&audit_entry);
563
564                    // Send 403 Forbidden response
565                    crate::http_helpers::write_error(
566                        session,
567                        403,
568                        "WebSocket not enabled for this route",
569                        "text/plain",
570                    )
571                    .await?;
572                    return Ok(true); // Request complete, don't continue
573                }
574
575                debug!(
576                    correlation_id = %ctx.trace_id,
577                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
578                    "WebSocket upgrade request allowed"
579                );
580
581                // Check for WebSocket frame inspection
582                if route_config.websocket_inspection {
583                    // Check for compression negotiation - skip inspection if permessage-deflate
584                    let has_compression = session
585                        .req_header()
586                        .headers
587                        .get("Sec-WebSocket-Extensions")
588                        .and_then(|v| v.to_str().ok())
589                        .map(|s| s.contains("permessage-deflate"))
590                        .unwrap_or(false);
591
592                    if has_compression {
593                        debug!(
594                            correlation_id = %ctx.trace_id,
595                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
596                            "WebSocket inspection skipped: permessage-deflate negotiated"
597                        );
598                        ctx.websocket_skip_inspection = true;
599                    } else {
600                        ctx.websocket_inspection_enabled = true;
601
602                        // Get agents that handle WebSocketFrame events
603                        ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
604                            sentinel_agent_protocol::EventType::WebSocketFrame,
605                        );
606
607                        debug!(
608                            correlation_id = %ctx.trace_id,
609                            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
610                            agent_count = ctx.websocket_inspection_agents.len(),
611                            "WebSocket frame inspection enabled"
612                        );
613                    }
614                }
615            }
616        }
617
618        // Use cached route config from upstream_peer (avoids duplicate route matching)
619        // Handle static file and builtin routes
620        if let Some(route_config) = ctx.route_config.clone() {
621            if route_config.service_type == sentinel_config::ServiceType::Static {
622                trace!(
623                    correlation_id = %ctx.trace_id,
624                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
625                    "Handling static file route"
626                );
627                // Create a minimal RouteMatch for the handler
628                let route_match = crate::routing::RouteMatch {
629                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
630                    config: route_config.clone(),
631                };
632                return self.handle_static_route(session, ctx, &route_match).await;
633            } else if route_config.service_type == sentinel_config::ServiceType::Builtin {
634                trace!(
635                    correlation_id = %ctx.trace_id,
636                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
637                    builtin_handler = ?route_config.builtin_handler,
638                    "Handling builtin route"
639                );
640                // Create a minimal RouteMatch for the handler
641                let route_match = crate::routing::RouteMatch {
642                    route_id: sentinel_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
643                    config: route_config.clone(),
644                };
645                return self.handle_builtin_route(session, ctx, &route_match).await;
646            }
647        }
648
649        // API validation for API routes
650        if let Some(route_id) = ctx.route_id.clone() {
651            if let Some(validator) = self.validators.get(&route_id).await {
652                trace!(
653                    correlation_id = %ctx.trace_id,
654                    route_id = %route_id,
655                    "Running API schema validation"
656                );
657                if let Some(result) = self
658                    .validate_api_request(session, ctx, &route_id, &validator)
659                    .await?
660                {
661                    debug!(
662                        correlation_id = %ctx.trace_id,
663                        route_id = %route_id,
664                        validation_passed = result,
665                        "API validation complete"
666                    );
667                    return Ok(result);
668                }
669            }
670        }
671
672        // Get client address before mutable borrow
673        let client_addr = session
674            .client_addr()
675            .map(|a| format!("{}", a))
676            .unwrap_or_else(|| "unknown".to_string());
677        let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
678
679        let req_header = session.req_header_mut();
680
681        // Add correlation ID header
682        req_header
683            .insert_header("X-Correlation-Id", &ctx.trace_id)
684            .ok();
685        req_header.insert_header("X-Forwarded-By", "Sentinel").ok();
686
687        // Use cached config (set in upstream_peer, or fetch now if needed)
688        let config = ctx
689            .config
690            .get_or_insert_with(|| self.config_manager.current());
691
692        // Enforce header limits (fast path: skip if limits are very high)
693        const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; // 1MB = effectively unlimited
694
695        // Header count check - O(1)
696        let header_count = req_header.headers.len();
697        if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
698            && header_count > config.limits.max_header_count
699        {
700            warn!(
701                correlation_id = %ctx.trace_id,
702                header_count = header_count,
703                limit = config.limits.max_header_count,
704                "Request blocked: exceeds header count limit"
705            );
706
707            self.metrics.record_blocked_request("header_count_exceeded");
708            return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
709        }
710
711        // Header size check - O(n), skip if limit is very high
712        if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
713            let total_header_size: usize = req_header
714                .headers
715                .iter()
716                .map(|(k, v)| k.as_str().len() + v.len())
717                .sum();
718
719            if total_header_size > config.limits.max_header_size_bytes {
720                warn!(
721                    correlation_id = %ctx.trace_id,
722                    header_size = total_header_size,
723                    limit = config.limits.max_header_size_bytes,
724                    "Request blocked: exceeds header size limit"
725                );
726
727                self.metrics.record_blocked_request("header_size_exceeded");
728                return Err(Error::explain(
729                    ErrorType::InternalError,
730                    "Headers too large",
731                ));
732            }
733        }
734
735        // Process through external agents
736        trace!(
737            correlation_id = %ctx.trace_id,
738            "Processing request through agents"
739        );
740        self.process_agents(session, ctx, &client_addr, client_port)
741            .await?;
742
743        trace!(
744            correlation_id = %ctx.trace_id,
745            "Request filter phase complete, forwarding to upstream"
746        );
747
748        Ok(false) // Continue processing
749    }
750
751    /// Process incoming request body chunks.
752    /// Used for body size enforcement and WAF/agent inspection.
753    ///
754    /// Supports two modes:
755    /// - **Buffer mode** (default): Buffer chunks until end of stream or limit, then send to agents
756    /// - **Stream mode**: Send each chunk immediately to agents as it arrives
757    async fn request_body_filter(
758        &self,
759        _session: &mut Session,
760        body: &mut Option<Bytes>,
761        end_of_stream: bool,
762        ctx: &mut Self::CTX,
763    ) -> Result<(), Box<Error>> {
764        use sentinel_config::BodyStreamingMode;
765
766        // Handle WebSocket frame inspection (client -> server)
767        if ctx.is_websocket_upgrade {
768            if let Some(ref handler) = ctx.websocket_handler {
769                let result = handler.process_client_data(body.take()).await;
770                match result {
771                    crate::websocket::ProcessResult::Forward(data) => {
772                        *body = data;
773                    }
774                    crate::websocket::ProcessResult::Close(reason) => {
775                        warn!(
776                            correlation_id = %ctx.trace_id,
777                            code = reason.code,
778                            reason = %reason.reason,
779                            "WebSocket connection closed by agent (client->server)"
780                        );
781                        // Return an error to close the connection
782                        return Err(Error::explain(
783                            ErrorType::InternalError,
784                            format!("WebSocket closed: {} {}", reason.code, reason.reason),
785                        ));
786                    }
787                }
788            }
789            // Skip normal body processing for WebSocket
790            return Ok(());
791        }
792
793        // Track request body size
794        let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
795        if chunk_len > 0 {
796            ctx.request_body_bytes += chunk_len as u64;
797
798            trace!(
799                correlation_id = %ctx.trace_id,
800                chunk_size = chunk_len,
801                total_body_bytes = ctx.request_body_bytes,
802                end_of_stream = end_of_stream,
803                streaming_mode = ?ctx.request_body_streaming_mode,
804                "Processing request body chunk"
805            );
806
807            // Check body size limit (use cached config)
808            let config = ctx
809                .config
810                .get_or_insert_with(|| self.config_manager.current());
811            if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
812                warn!(
813                    correlation_id = %ctx.trace_id,
814                    body_bytes = ctx.request_body_bytes,
815                    limit = config.limits.max_body_size_bytes,
816                    "Request body size limit exceeded"
817                );
818                self.metrics.record_blocked_request("body_size_exceeded");
819                return Err(Error::explain(
820                    ErrorType::InternalError,
821                    "Request body too large",
822                ));
823            }
824        }
825
826        // Body inspection for agents (WAF, etc.)
827        if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
828            let config = ctx
829                .config
830                .get_or_insert_with(|| self.config_manager.current());
831            let max_inspection_bytes = config
832                .waf
833                .as_ref()
834                .map(|w| w.body_inspection.max_inspection_bytes as u64)
835                .unwrap_or(1024 * 1024);
836
837            match ctx.request_body_streaming_mode {
838                BodyStreamingMode::Stream => {
839                    // Stream mode: send each chunk immediately
840                    if body.is_some() {
841                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
842                            .await?;
843                    } else if end_of_stream && ctx.agent_needs_more {
844                        // Send final empty chunk to signal end
845                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
846                            .await?;
847                    }
848                }
849                BodyStreamingMode::Hybrid { buffer_threshold } => {
850                    // Hybrid mode: buffer up to threshold, then stream
851                    if ctx.body_bytes_inspected < buffer_threshold as u64 {
852                        // Still in buffering phase
853                        if let Some(ref chunk) = body {
854                            let bytes_to_buffer = std::cmp::min(
855                                chunk.len(),
856                                (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
857                            );
858                            ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
859                            ctx.body_bytes_inspected += bytes_to_buffer as u64;
860
861                            // If we've reached threshold or end of stream, switch to streaming
862                            if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
863                            {
864                                // Send buffered content first
865                                self.send_buffered_body_to_agents(
866                                    end_of_stream && chunk.len() == bytes_to_buffer,
867                                    ctx,
868                                )
869                                .await?;
870                                ctx.body_buffer.clear();
871
872                                // If there's remaining data in this chunk, stream it
873                                if bytes_to_buffer < chunk.len() {
874                                    let remaining = chunk.slice(bytes_to_buffer..);
875                                    let mut remaining_body = Some(remaining);
876                                    self.process_body_chunk_streaming(
877                                        &mut remaining_body,
878                                        end_of_stream,
879                                        ctx,
880                                    )
881                                    .await?;
882                                }
883                            }
884                        }
885                    } else {
886                        // Past threshold, stream directly
887                        self.process_body_chunk_streaming(body, end_of_stream, ctx)
888                            .await?;
889                    }
890                }
891                BodyStreamingMode::Buffer => {
892                    // Buffer mode: collect chunks until ready to send
893                    if let Some(ref chunk) = body {
894                        if ctx.body_bytes_inspected < max_inspection_bytes {
895                            let bytes_to_inspect = std::cmp::min(
896                                chunk.len() as u64,
897                                max_inspection_bytes - ctx.body_bytes_inspected,
898                            ) as usize;
899
900                            ctx.body_buffer
901                                .extend_from_slice(&chunk[..bytes_to_inspect]);
902                            ctx.body_bytes_inspected += bytes_to_inspect as u64;
903
904                            trace!(
905                                correlation_id = %ctx.trace_id,
906                                bytes_inspected = ctx.body_bytes_inspected,
907                                max_inspection_bytes = max_inspection_bytes,
908                                buffer_size = ctx.body_buffer.len(),
909                                "Buffering body for agent inspection"
910                            );
911                        }
912                    }
913
914                    // Send when complete or limit reached
915                    let should_send =
916                        end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
917                    if should_send && !ctx.body_buffer.is_empty() {
918                        self.send_buffered_body_to_agents(end_of_stream, ctx)
919                            .await?;
920                        ctx.body_buffer.clear();
921                    }
922                }
923            }
924        }
925
926        if end_of_stream {
927            trace!(
928                correlation_id = %ctx.trace_id,
929                total_body_bytes = ctx.request_body_bytes,
930                bytes_inspected = ctx.body_bytes_inspected,
931                "Request body complete"
932            );
933        }
934
935        Ok(())
936    }
937
938    async fn response_filter(
939        &self,
940        _session: &mut Session,
941        upstream_response: &mut ResponseHeader,
942        ctx: &mut Self::CTX,
943    ) -> Result<(), Box<Error>> {
944        let status = upstream_response.status.as_u16();
945        let duration = ctx.elapsed();
946
947        trace!(
948            correlation_id = %ctx.trace_id,
949            status = status,
950            "Starting response filter phase"
951        );
952
953        // Handle WebSocket 101 Switching Protocols
954        if status == 101 && ctx.is_websocket_upgrade {
955            if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
956                // Create WebSocket inspector and handler with metrics
957                let inspector = crate::websocket::WebSocketInspector::with_metrics(
958                    self.agent_manager.clone(),
959                    ctx.route_id
960                        .clone()
961                        .unwrap_or_else(|| "unknown".to_string()),
962                    ctx.trace_id.clone(),
963                    ctx.client_ip.clone(),
964                    100, // 100ms timeout per frame inspection
965                    Some(self.metrics.clone()),
966                );
967
968                let handler = crate::websocket::WebSocketHandler::new(
969                    std::sync::Arc::new(inspector),
970                    1024 * 1024, // 1MB max frame size
971                );
972
973                ctx.websocket_handler = Some(std::sync::Arc::new(handler));
974
975                info!(
976                    correlation_id = %ctx.trace_id,
977                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
978                    agent_count = ctx.websocket_inspection_agents.len(),
979                    "WebSocket upgrade successful, frame inspection enabled"
980                );
981            } else if ctx.websocket_skip_inspection {
982                debug!(
983                    correlation_id = %ctx.trace_id,
984                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
985                    "WebSocket upgrade successful, inspection skipped (compression negotiated)"
986                );
987            } else {
988                debug!(
989                    correlation_id = %ctx.trace_id,
990                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
991                    "WebSocket upgrade successful"
992                );
993            }
994        }
995
996        // Apply security headers
997        trace!(
998            correlation_id = %ctx.trace_id,
999            "Applying security headers"
1000        );
1001        self.apply_security_headers(upstream_response).ok();
1002
1003        // Add correlation ID to response
1004        upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1005
1006        // Generate custom error pages for error responses
1007        if status >= 400 {
1008            trace!(
1009                correlation_id = %ctx.trace_id,
1010                status = status,
1011                "Handling error response"
1012            );
1013            self.handle_error_response(upstream_response, ctx).await?;
1014        }
1015
1016        // Record metrics
1017        self.metrics.record_request(
1018            ctx.route_id.as_deref().unwrap_or("unknown"),
1019            &ctx.method,
1020            status,
1021            duration,
1022        );
1023
1024        // Record passive health check
1025        if let Some(ref upstream) = ctx.upstream {
1026            let success = status < 500;
1027
1028            trace!(
1029                correlation_id = %ctx.trace_id,
1030                upstream = %upstream,
1031                success = success,
1032                status = status,
1033                "Recording passive health check result"
1034            );
1035
1036            self.passive_health.record_outcome(upstream, success).await;
1037
1038            // Report to upstream pool
1039            if let Some(pool) = self.upstream_pools.get(upstream).await {
1040                pool.report_result(upstream, success).await;
1041            }
1042
1043            if !success {
1044                warn!(
1045                    correlation_id = %ctx.trace_id,
1046                    upstream = %upstream,
1047                    status = status,
1048                    "Upstream returned error status"
1049                );
1050            }
1051        }
1052
1053        // Final request completion log
1054        if status >= 500 {
1055            error!(
1056                correlation_id = %ctx.trace_id,
1057                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1058                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1059                method = %ctx.method,
1060                path = %ctx.path,
1061                status = status,
1062                duration_ms = duration.as_millis(),
1063                attempts = ctx.upstream_attempts,
1064                "Request completed with server error"
1065            );
1066        } else if status >= 400 {
1067            warn!(
1068                correlation_id = %ctx.trace_id,
1069                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1070                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1071                method = %ctx.method,
1072                path = %ctx.path,
1073                status = status,
1074                duration_ms = duration.as_millis(),
1075                "Request completed with client error"
1076            );
1077        } else {
1078            debug!(
1079                correlation_id = %ctx.trace_id,
1080                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1081                upstream = ctx.upstream.as_deref().unwrap_or("none"),
1082                method = %ctx.method,
1083                path = %ctx.path,
1084                status = status,
1085                duration_ms = duration.as_millis(),
1086                attempts = ctx.upstream_attempts,
1087                "Request completed"
1088            );
1089        }
1090
1091        Ok(())
1092    }
1093
1094    /// Modify the request before sending to upstream.
1095    /// Used for header modifications, adding authentication, etc.
1096    async fn upstream_request_filter(
1097        &self,
1098        _session: &mut Session,
1099        upstream_request: &mut pingora::http::RequestHeader,
1100        ctx: &mut Self::CTX,
1101    ) -> Result<()>
1102    where
1103        Self::CTX: Send + Sync,
1104    {
1105        trace!(
1106            correlation_id = %ctx.trace_id,
1107            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1108            "Applying upstream request modifications"
1109        );
1110
1111        // Add trace ID header for upstream correlation
1112        upstream_request
1113            .insert_header("X-Trace-Id", &ctx.trace_id)
1114            .ok();
1115
1116        // Add request metadata headers
1117        upstream_request
1118            .insert_header("X-Forwarded-By", "Sentinel")
1119            .ok();
1120
1121        // Apply route-specific request header modifications
1122        // Clone the modifications to avoid lifetime issues with the header API
1123        if let Some(ref route_config) = ctx.route_config {
1124            let mods = route_config.policies.request_headers.clone();
1125
1126            // Set headers (overwrite existing)
1127            for (name, value) in mods.set {
1128                upstream_request.insert_header(name, value).ok();
1129            }
1130
1131            // Add headers (append)
1132            for (name, value) in mods.add {
1133                upstream_request.append_header(name, value).ok();
1134            }
1135
1136            // Remove headers
1137            for name in &mods.remove {
1138                upstream_request.remove_header(name);
1139            }
1140
1141            trace!(
1142                correlation_id = %ctx.trace_id,
1143                "Applied request header modifications"
1144            );
1145        }
1146
1147        // Remove sensitive headers that shouldn't go to upstream
1148        upstream_request.remove_header("X-Internal-Token");
1149        upstream_request.remove_header("Authorization-Internal");
1150
1151        Ok(())
1152    }
1153
1154    /// Process response body chunks from upstream.
1155    /// Used for response size tracking and WAF inspection.
1156    ///
1157    /// Note: Response body inspection is currently buffered only (streaming mode not supported
1158    /// for responses due to Pingora's synchronous filter design).
1159    fn response_body_filter(
1160        &self,
1161        _session: &mut Session,
1162        body: &mut Option<Bytes>,
1163        end_of_stream: bool,
1164        ctx: &mut Self::CTX,
1165    ) -> Result<Option<Duration>, Box<Error>> {
1166        // Handle WebSocket frame inspection (server -> client)
1167        // Note: This filter is synchronous, so we use block_in_place for async agent calls
1168        if ctx.is_websocket_upgrade {
1169            if let Some(ref handler) = ctx.websocket_handler {
1170                let handler = handler.clone();
1171                let data = body.take();
1172
1173                // Use block_in_place to run async handler from sync context
1174                // This is safe because Pingora uses a multi-threaded tokio runtime
1175                let result = tokio::task::block_in_place(|| {
1176                    tokio::runtime::Handle::current()
1177                        .block_on(async { handler.process_server_data(data).await })
1178                });
1179
1180                match result {
1181                    crate::websocket::ProcessResult::Forward(data) => {
1182                        *body = data;
1183                    }
1184                    crate::websocket::ProcessResult::Close(reason) => {
1185                        warn!(
1186                            correlation_id = %ctx.trace_id,
1187                            code = reason.code,
1188                            reason = %reason.reason,
1189                            "WebSocket connection closed by agent (server->client)"
1190                        );
1191                        // For sync filter, we can't return an error that closes the connection
1192                        // Instead, inject a close frame
1193                        let close_frame =
1194                            crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
1195                        let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
1196                        if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
1197                            *body = Some(Bytes::from(encoded));
1198                        }
1199                    }
1200                }
1201            }
1202            // Skip normal body processing for WebSocket
1203            return Ok(None);
1204        }
1205
1206        // Track response body size
1207        if let Some(ref chunk) = body {
1208            ctx.response_bytes += chunk.len() as u64;
1209
1210            trace!(
1211                correlation_id = %ctx.trace_id,
1212                chunk_size = chunk.len(),
1213                total_response_bytes = ctx.response_bytes,
1214                end_of_stream = end_of_stream,
1215                "Processing response body chunk"
1216            );
1217
1218            // Response body inspection (buffered mode only)
1219            // Note: Streaming mode for response bodies is not currently supported
1220            // due to Pingora's synchronous response_body_filter design
1221            if ctx.response_body_inspection_enabled
1222                && !ctx.response_body_inspection_agents.is_empty()
1223            {
1224                let config = ctx
1225                    .config
1226                    .get_or_insert_with(|| self.config_manager.current());
1227                let max_inspection_bytes = config
1228                    .waf
1229                    .as_ref()
1230                    .map(|w| w.body_inspection.max_inspection_bytes as u64)
1231                    .unwrap_or(1024 * 1024);
1232
1233                if ctx.response_body_bytes_inspected < max_inspection_bytes {
1234                    let bytes_to_inspect = std::cmp::min(
1235                        chunk.len() as u64,
1236                        max_inspection_bytes - ctx.response_body_bytes_inspected,
1237                    ) as usize;
1238
1239                    // Buffer for later processing (during logging phase)
1240                    // Response body inspection happens asynchronously and results
1241                    // are logged rather than blocking the response
1242                    ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
1243                    ctx.response_body_chunk_index += 1;
1244
1245                    trace!(
1246                        correlation_id = %ctx.trace_id,
1247                        bytes_inspected = ctx.response_body_bytes_inspected,
1248                        max_inspection_bytes = max_inspection_bytes,
1249                        chunk_index = ctx.response_body_chunk_index,
1250                        "Tracking response body for inspection"
1251                    );
1252                }
1253            }
1254        }
1255
1256        if end_of_stream {
1257            trace!(
1258                correlation_id = %ctx.trace_id,
1259                total_response_bytes = ctx.response_bytes,
1260                response_bytes_inspected = ctx.response_body_bytes_inspected,
1261                "Response body complete"
1262            );
1263        }
1264
1265        // Return None to indicate no delay needed
1266        Ok(None)
1267    }
1268
1269    /// Called when a connection to upstream is established or reused.
1270    /// Logs connection reuse statistics for observability.
1271    async fn connected_to_upstream(
1272        &self,
1273        _session: &mut Session,
1274        reused: bool,
1275        peer: &HttpPeer,
1276        #[cfg(unix)] _fd: RawFd,
1277        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
1278        digest: Option<&Digest>,
1279        ctx: &mut Self::CTX,
1280    ) -> Result<(), Box<Error>> {
1281        // Track connection reuse for metrics
1282        ctx.connection_reused = reused;
1283
1284        // Log connection establishment/reuse
1285        if reused {
1286            trace!(
1287                correlation_id = %ctx.trace_id,
1288                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1289                peer_address = %peer.address(),
1290                "Reusing existing upstream connection"
1291            );
1292        } else {
1293            debug!(
1294                correlation_id = %ctx.trace_id,
1295                upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1296                peer_address = %peer.address(),
1297                ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
1298                "Established new upstream connection"
1299            );
1300        }
1301
1302        Ok(())
1303    }
1304
1305    // =========================================================================
1306    // HTTP Caching - Pingora Cache Integration
1307    // =========================================================================
1308
1309    /// Decide if the request should use caching.
1310    ///
1311    /// This method is called early in the request lifecycle to determine if
1312    /// the response should be served from cache or if the response should
1313    /// be cached.
1314    fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
1315        // Check if route has caching enabled
1316        let route_id = match ctx.route_id.as_deref() {
1317            Some(id) => id,
1318            None => {
1319                trace!(
1320                    correlation_id = %ctx.trace_id,
1321                    "Cache filter: no route ID, skipping cache"
1322                );
1323                return Ok(());
1324            }
1325        };
1326
1327        // Check if caching is enabled for this route
1328        if !self.cache_manager.is_enabled(route_id) {
1329            trace!(
1330                correlation_id = %ctx.trace_id,
1331                route_id = %route_id,
1332                "Cache disabled for route"
1333            );
1334            return Ok(());
1335        }
1336
1337        // Check if method is cacheable (typically GET/HEAD)
1338        if !self
1339            .cache_manager
1340            .is_method_cacheable(route_id, &ctx.method)
1341        {
1342            trace!(
1343                correlation_id = %ctx.trace_id,
1344                route_id = %route_id,
1345                method = %ctx.method,
1346                "Method not cacheable"
1347            );
1348            return Ok(());
1349        }
1350
1351        // Enable caching for this request using Pingora's cache infrastructure
1352        debug!(
1353            correlation_id = %ctx.trace_id,
1354            route_id = %route_id,
1355            method = %ctx.method,
1356            path = %ctx.path,
1357            "Enabling HTTP caching for request"
1358        );
1359
1360        // Get static references to cache infrastructure
1361        let storage = get_cache_storage();
1362        let eviction = get_cache_eviction();
1363        let cache_lock = get_cache_lock();
1364
1365        // Enable the cache with storage, eviction, and lock
1366        session.cache.enable(
1367            storage,
1368            Some(eviction),
1369            None, // predictor - optional
1370            Some(cache_lock),
1371            None, // option overrides
1372        );
1373
1374        // Mark request as cache-eligible in context
1375        ctx.cache_eligible = true;
1376
1377        trace!(
1378            correlation_id = %ctx.trace_id,
1379            route_id = %route_id,
1380            cache_enabled = session.cache.enabled(),
1381            "Cache enabled for request"
1382        );
1383
1384        Ok(())
1385    }
1386
1387    /// Generate the cache key for this request.
1388    ///
1389    /// The cache key uniquely identifies the cached response. It typically
1390    /// includes the method, host, path, and potentially query parameters.
1391    fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
1392        let req_header = session.req_header();
1393        let method = req_header.method.as_str();
1394        let path = req_header.uri.path();
1395        let host = ctx.host.as_deref().unwrap_or("unknown");
1396        let query = req_header.uri.query();
1397
1398        // Generate cache key using our cache manager
1399        let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1400
1401        trace!(
1402            correlation_id = %ctx.trace_id,
1403            cache_key = %key_string,
1404            "Generated cache key"
1405        );
1406
1407        // Use Pingora's default cache key generator which handles
1408        // proper hashing and internal format
1409        Ok(CacheKey::default(req_header))
1410    }
1411
1412    /// Called when a cache miss occurs.
1413    ///
1414    /// This is called when the cache lookup found no matching entry.
1415    /// We can use this to log and track cache misses.
1416    fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
1417        // Let Pingora handle the cache miss
1418        session.cache.cache_miss();
1419
1420        // Track statistics
1421        if let Some(route_id) = ctx.route_id.as_deref() {
1422            self.cache_manager.stats().record_miss();
1423
1424            trace!(
1425                correlation_id = %ctx.trace_id,
1426                route_id = %route_id,
1427                path = %ctx.path,
1428                "Cache miss"
1429            );
1430        }
1431    }
1432
1433    /// Called after a successful cache lookup.
1434    ///
1435    /// This filter allows inspecting the cached response before serving it.
1436    /// Returns `None` to serve the cached response, or a `ForcedInvalidationKind`
1437    /// to invalidate and refetch.
1438    async fn cache_hit_filter(
1439        &self,
1440        session: &mut Session,
1441        meta: &CacheMeta,
1442        _hit_handler: &mut HitHandler,
1443        is_fresh: bool,
1444        ctx: &mut Self::CTX,
1445    ) -> Result<Option<ForcedInvalidationKind>>
1446    where
1447        Self::CTX: Send + Sync,
1448    {
1449        // Check if this cache entry should be invalidated due to a purge request
1450        let req_header = session.req_header();
1451        let method = req_header.method.as_str();
1452        let path = req_header.uri.path();
1453        let host = req_header.uri.host().unwrap_or("localhost");
1454        let query = req_header.uri.query();
1455
1456        // Generate the cache key for this request
1457        let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
1458
1459        // Check if this key should be invalidated
1460        if self.cache_manager.should_invalidate(&cache_key) {
1461            info!(
1462                correlation_id = %ctx.trace_id,
1463                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1464                cache_key = %cache_key,
1465                "Cache entry invalidated by purge request"
1466            );
1467            // Force expiration so the entry is refetched from upstream
1468            return Ok(Some(ForcedInvalidationKind::ForceExpired));
1469        }
1470
1471        // Track cache hit statistics
1472        if is_fresh {
1473            self.cache_manager.stats().record_hit();
1474
1475            debug!(
1476                correlation_id = %ctx.trace_id,
1477                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1478                is_fresh = is_fresh,
1479                "Cache hit (fresh)"
1480            );
1481        } else {
1482            trace!(
1483                correlation_id = %ctx.trace_id,
1484                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1485                is_fresh = is_fresh,
1486                "Cache hit (stale)"
1487            );
1488        }
1489
1490        // Serve the cached response without invalidation
1491        Ok(None)
1492    }
1493
1494    /// Decide if the response should be cached.
1495    ///
1496    /// Called after receiving the response from upstream to determine
1497    /// if it should be stored in the cache.
1498    fn response_cache_filter(
1499        &self,
1500        _session: &Session,
1501        resp: &ResponseHeader,
1502        ctx: &mut Self::CTX,
1503    ) -> Result<RespCacheable> {
1504        let route_id = match ctx.route_id.as_deref() {
1505            Some(id) => id,
1506            None => {
1507                return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1508                    "no_route",
1509                )));
1510            }
1511        };
1512
1513        // Check if caching is enabled for this route
1514        if !self.cache_manager.is_enabled(route_id) {
1515            return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
1516                "disabled",
1517            )));
1518        }
1519
1520        let status = resp.status.as_u16();
1521
1522        // Check if status code is cacheable
1523        if !self.cache_manager.is_status_cacheable(route_id, status) {
1524            trace!(
1525                correlation_id = %ctx.trace_id,
1526                route_id = %route_id,
1527                status = status,
1528                "Status code not cacheable"
1529            );
1530            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1531        }
1532
1533        // Check Cache-Control header for no-store, no-cache, private
1534        if let Some(cache_control) = resp.headers.get("cache-control") {
1535            if let Ok(cc_str) = cache_control.to_str() {
1536                if crate::cache::CacheManager::is_no_cache(cc_str) {
1537                    trace!(
1538                        correlation_id = %ctx.trace_id,
1539                        route_id = %route_id,
1540                        cache_control = %cc_str,
1541                        "Response has no-cache directive"
1542                    );
1543                    return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1544                }
1545            }
1546        }
1547
1548        // Calculate TTL from Cache-Control or use default
1549        let cache_control = resp
1550            .headers
1551            .get("cache-control")
1552            .and_then(|v| v.to_str().ok());
1553        let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
1554
1555        if ttl.is_zero() {
1556            trace!(
1557                correlation_id = %ctx.trace_id,
1558                route_id = %route_id,
1559                "TTL is zero, not caching"
1560            );
1561            return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
1562        }
1563
1564        // Get route cache config for stale settings
1565        let config = self
1566            .cache_manager
1567            .get_route_config(route_id)
1568            .unwrap_or_default();
1569
1570        // Create timestamps for cache metadata
1571        let now = std::time::SystemTime::now();
1572        let fresh_until = now + ttl;
1573
1574        // Clone the response header for storage
1575        let header = resp.clone();
1576
1577        // Create CacheMeta with proper timestamps and TTLs
1578        let cache_meta = CacheMeta::new(
1579            fresh_until,
1580            now,
1581            config.stale_while_revalidate_secs as u32,
1582            config.stale_if_error_secs as u32,
1583            header,
1584        );
1585
1586        // Track the cache store
1587        self.cache_manager.stats().record_store();
1588
1589        debug!(
1590            correlation_id = %ctx.trace_id,
1591            route_id = %route_id,
1592            status = status,
1593            ttl_secs = ttl.as_secs(),
1594            stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1595            stale_if_error_secs = config.stale_if_error_secs,
1596            "Caching response"
1597        );
1598
1599        Ok(RespCacheable::Cacheable(cache_meta))
1600    }
1601
1602    /// Decide whether to serve stale content on error or during revalidation.
1603    ///
1604    /// This implements stale-while-revalidate and stale-if-error semantics.
1605    fn should_serve_stale(
1606        &self,
1607        _session: &mut Session,
1608        ctx: &mut Self::CTX,
1609        error: Option<&Error>,
1610    ) -> bool {
1611        let route_id = match ctx.route_id.as_deref() {
1612            Some(id) => id,
1613            None => return false,
1614        };
1615
1616        // Get route cache config for stale settings
1617        let config = match self.cache_manager.get_route_config(route_id) {
1618            Some(c) => c,
1619            None => return false,
1620        };
1621
1622        // If there's an upstream error, use stale-if-error
1623        if let Some(e) = error {
1624            // Only serve stale for upstream errors
1625            if e.esource() == &pingora::ErrorSource::Upstream {
1626                debug!(
1627                    correlation_id = %ctx.trace_id,
1628                    route_id = %route_id,
1629                    error = %e,
1630                    stale_if_error_secs = config.stale_if_error_secs,
1631                    "Considering stale-if-error"
1632                );
1633                return config.stale_if_error_secs > 0;
1634            }
1635        }
1636
1637        // During stale-while-revalidate (error is None)
1638        if error.is_none() && config.stale_while_revalidate_secs > 0 {
1639            trace!(
1640                correlation_id = %ctx.trace_id,
1641                route_id = %route_id,
1642                stale_while_revalidate_secs = config.stale_while_revalidate_secs,
1643                "Allowing stale-while-revalidate"
1644            );
1645            return true;
1646        }
1647
1648        false
1649    }
1650
1651    /// Handle Range header for byte-range requests (streaming support).
1652    ///
1653    /// This method is called when a Range header is present in the request.
1654    /// It allows proper handling of:
1655    /// - Video streaming (HTML5 video seeking)
1656    /// - Large file downloads with resume support
1657    /// - Partial content delivery
1658    ///
1659    /// Uses Pingora's built-in range handling with route-specific logging.
1660    fn range_header_filter(
1661        &self,
1662        session: &mut Session,
1663        response: &mut ResponseHeader,
1664        ctx: &mut Self::CTX,
1665    ) -> pingora_proxy::RangeType
1666    where
1667        Self::CTX: Send + Sync,
1668    {
1669        // Check if route supports range requests
1670        let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
1671            // Static file routes and media routes should support range requests
1672            matches!(
1673                config.service_type,
1674                sentinel_config::ServiceType::Static | sentinel_config::ServiceType::Web
1675            )
1676        });
1677
1678        if !supports_range {
1679            trace!(
1680                correlation_id = %ctx.trace_id,
1681                route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1682                "Range request not supported for this route type"
1683            );
1684            return pingora_proxy::RangeType::None;
1685        }
1686
1687        // Use Pingora's built-in range header parsing and handling
1688        let range_type = pingora_proxy::range_header_filter(session.req_header(), response);
1689
1690        match &range_type {
1691            pingora_proxy::RangeType::None => {
1692                trace!(
1693                    correlation_id = %ctx.trace_id,
1694                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1695                    "No range request or not applicable"
1696                );
1697            }
1698            pingora_proxy::RangeType::Single(range) => {
1699                trace!(
1700                    correlation_id = %ctx.trace_id,
1701                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1702                    range_start = range.start,
1703                    range_end = range.end,
1704                    "Processing single-range request"
1705                );
1706            }
1707            pingora_proxy::RangeType::Multi(multi) => {
1708                trace!(
1709                    correlation_id = %ctx.trace_id,
1710                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1711                    range_count = multi.ranges.len(),
1712                    "Processing multi-range request"
1713                );
1714            }
1715            pingora_proxy::RangeType::Invalid => {
1716                debug!(
1717                    correlation_id = %ctx.trace_id,
1718                    route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1719                    "Invalid range header"
1720                );
1721            }
1722        }
1723
1724        range_type
1725    }
1726
1727    /// Handle fatal proxy errors by generating custom error pages.
1728    /// Called when the proxy itself fails to process the request.
1729    async fn fail_to_proxy(
1730        &self,
1731        _session: &mut Session,
1732        e: &Error,
1733        ctx: &mut Self::CTX,
1734    ) -> pingora_proxy::FailToProxy
1735    where
1736        Self::CTX: Send + Sync,
1737    {
1738        let error_code = match e.etype() {
1739            // Connection errors
1740            ErrorType::ConnectRefused => 503,
1741            ErrorType::ConnectTimedout => 504,
1742            ErrorType::ConnectNoRoute => 502,
1743
1744            // Timeout errors
1745            ErrorType::ReadTimedout => 504,
1746            ErrorType::WriteTimedout => 504,
1747
1748            // TLS errors
1749            ErrorType::TLSHandshakeFailure => 502,
1750            ErrorType::InvalidCert => 502,
1751
1752            // Protocol errors
1753            ErrorType::InvalidHTTPHeader => 400,
1754            ErrorType::H2Error => 502,
1755
1756            // Resource errors
1757            ErrorType::ConnectProxyFailure => 502,
1758            ErrorType::ConnectionClosed => 502,
1759
1760            // Internal errors
1761            ErrorType::InternalError => 500,
1762
1763            // Default to 502 for unknown errors
1764            _ => 502,
1765        };
1766
1767        error!(
1768            correlation_id = %ctx.trace_id,
1769            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1770            upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1771            error_type = ?e.etype(),
1772            error = %e,
1773            error_code = error_code,
1774            "Proxy error occurred"
1775        );
1776
1777        // Record the error in metrics
1778        self.metrics
1779            .record_blocked_request(&format!("proxy_error_{}", error_code));
1780
1781        // Return the error response info
1782        // can_reuse_downstream: allow connection reuse for client errors, not for server errors
1783        pingora_proxy::FailToProxy {
1784            error_code,
1785            can_reuse_downstream: error_code < 500,
1786        }
1787    }
1788
1789    /// Handle errors that occur during proxying after upstream connection is established.
1790    ///
1791    /// This method enables retry logic and circuit breaker integration.
1792    /// It's called when an error occurs during the request/response exchange
1793    /// with the upstream server.
1794    fn error_while_proxy(
1795        &self,
1796        peer: &HttpPeer,
1797        session: &mut Session,
1798        e: Box<Error>,
1799        ctx: &mut Self::CTX,
1800        client_reused: bool,
1801    ) -> Box<Error> {
1802        let error_type = e.etype().clone();
1803        let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
1804
1805        // Classify error for retry decisions
1806        let is_retryable = matches!(
1807            error_type,
1808            ErrorType::ConnectTimedout
1809                | ErrorType::ReadTimedout
1810                | ErrorType::WriteTimedout
1811                | ErrorType::ConnectionClosed
1812                | ErrorType::ConnectRefused
1813        );
1814
1815        // Log the error with context
1816        warn!(
1817            correlation_id = %ctx.trace_id,
1818            route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1819            upstream = %upstream_id,
1820            peer_address = %peer.address(),
1821            error_type = ?error_type,
1822            error = %e,
1823            client_reused = client_reused,
1824            is_retryable = is_retryable,
1825            "Error during proxy operation"
1826        );
1827
1828        // Record failure with circuit breaker via upstream pool
1829        // This is done asynchronously since we can't await in a sync fn
1830        let peer_address = peer.address().to_string();
1831        let upstream_pools = self.upstream_pools.clone();
1832        let upstream_id_owned = upstream_id.to_string();
1833        tokio::spawn(async move {
1834            if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
1835                pool.report_result(&peer_address, false).await;
1836            }
1837        });
1838
1839        // Metrics tracking
1840        self.metrics
1841            .record_blocked_request(&format!("proxy_error_{:?}", error_type));
1842
1843        // Create enhanced error with retry information
1844        let mut enhanced_error = e.more_context(format!(
1845            "Upstream: {}, Peer: {}, Attempts: {}",
1846            upstream_id,
1847            peer.address(),
1848            ctx.upstream_attempts
1849        ));
1850
1851        // Determine if retry should be attempted:
1852        // - Only retry if error is retryable type
1853        // - Only retry reused connections if buffer isn't truncated
1854        // - Track retry metrics
1855        if is_retryable {
1856            let can_retry = if client_reused {
1857                // For reused connections, check if retry buffer is intact
1858                !session.as_ref().retry_buffer_truncated()
1859            } else {
1860                // Fresh connections can always retry
1861                true
1862            };
1863
1864            enhanced_error.retry.decide_reuse(can_retry);
1865
1866            if can_retry {
1867                debug!(
1868                    correlation_id = %ctx.trace_id,
1869                    upstream = %upstream_id,
1870                    error_type = ?error_type,
1871                    "Error is retryable, will attempt retry"
1872                );
1873            }
1874        } else {
1875            // Non-retryable error - don't retry
1876            enhanced_error.retry.decide_reuse(false);
1877        }
1878
1879        enhanced_error
1880    }
1881
1882    async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
1883        // Decrement active requests
1884        self.reload_coordinator.dec_requests();
1885
1886        let duration = ctx.elapsed();
1887
1888        // Get response status
1889        let status = session
1890            .response_written()
1891            .map(|r| r.status.as_u16())
1892            .unwrap_or(0);
1893
1894        // Write to access log file if configured
1895        if self.log_manager.access_log_enabled() {
1896            let access_entry = AccessLogEntry {
1897                timestamp: chrono::Utc::now().to_rfc3339(),
1898                trace_id: ctx.trace_id.clone(),
1899                method: ctx.method.clone(),
1900                path: ctx.path.clone(),
1901                query: ctx.query.clone(),
1902                protocol: "HTTP/1.1".to_string(),
1903                status,
1904                body_bytes: ctx.response_bytes,
1905                duration_ms: duration.as_millis() as u64,
1906                client_ip: ctx.client_ip.clone(),
1907                user_agent: ctx.user_agent.clone(),
1908                referer: ctx.referer.clone(),
1909                host: ctx.host.clone(),
1910                route_id: ctx.route_id.clone(),
1911                upstream: ctx.upstream.clone(),
1912                upstream_attempts: ctx.upstream_attempts,
1913                instance_id: self.app_state.instance_id.clone(),
1914            };
1915            self.log_manager.log_access(&access_entry);
1916        }
1917
1918        // Log to tracing at debug level (avoid allocations if debug disabled)
1919        if tracing::enabled!(tracing::Level::DEBUG) {
1920            debug!(
1921                trace_id = %ctx.trace_id,
1922                method = %ctx.method,
1923                path = %ctx.path,
1924                route_id = ?ctx.route_id,
1925                upstream = ?ctx.upstream,
1926                status = status,
1927                duration_ms = duration.as_millis() as u64,
1928                upstream_attempts = ctx.upstream_attempts,
1929                error = ?_error.map(|e| e.to_string()),
1930                "Request completed"
1931            );
1932        }
1933
1934        // Log WebSocket upgrades at info level
1935        if ctx.is_websocket_upgrade && status == 101 {
1936            info!(
1937                trace_id = %ctx.trace_id,
1938                route_id = ?ctx.route_id,
1939                upstream = ?ctx.upstream,
1940                client_ip = %ctx.client_ip,
1941                "WebSocket connection established"
1942            );
1943        }
1944    }
1945}
1946
1947// =============================================================================
1948// Helper methods for body streaming (not part of ProxyHttp trait)
1949// =============================================================================
1950
1951impl SentinelProxy {
1952    /// Process a single body chunk in streaming mode.
1953    async fn process_body_chunk_streaming(
1954        &self,
1955        body: &mut Option<Bytes>,
1956        end_of_stream: bool,
1957        ctx: &mut RequestContext,
1958    ) -> Result<(), Box<Error>> {
1959        // Clone the chunk data to avoid borrowing issues when mutating body later
1960        let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
1961        let chunk_index = ctx.request_body_chunk_index;
1962        ctx.request_body_chunk_index += 1;
1963        ctx.body_bytes_inspected += chunk_data.len() as u64;
1964
1965        debug!(
1966            correlation_id = %ctx.trace_id,
1967            chunk_index = chunk_index,
1968            chunk_size = chunk_data.len(),
1969            end_of_stream = end_of_stream,
1970            "Streaming body chunk to agents"
1971        );
1972
1973        // Create agent call context
1974        let agent_ctx = crate::agents::AgentCallContext {
1975            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
1976            metadata: sentinel_agent_protocol::RequestMetadata {
1977                correlation_id: ctx.trace_id.clone(),
1978                request_id: ctx.trace_id.clone(),
1979                client_ip: ctx.client_ip.clone(),
1980                client_port: 0,
1981                server_name: ctx.host.clone(),
1982                protocol: "HTTP/1.1".to_string(),
1983                tls_version: None,
1984                tls_cipher: None,
1985                route_id: ctx.route_id.clone(),
1986                upstream_id: ctx.upstream.clone(),
1987                timestamp: chrono::Utc::now().to_rfc3339(),
1988            },
1989            route_id: ctx.route_id.clone(),
1990            upstream_id: ctx.upstream.clone(),
1991            request_body: None, // Not used in streaming mode
1992            response_body: None,
1993        };
1994
1995        let agent_ids = ctx.body_inspection_agents.clone();
1996        let total_size = None; // Unknown in streaming mode
1997
1998        match self
1999            .agent_manager
2000            .process_request_body_streaming(
2001                &agent_ctx,
2002                &chunk_data,
2003                end_of_stream,
2004                chunk_index,
2005                ctx.body_bytes_inspected as usize,
2006                total_size,
2007                &agent_ids,
2008            )
2009            .await
2010        {
2011            Ok(decision) => {
2012                // Track if agent needs more data
2013                ctx.agent_needs_more = decision.needs_more;
2014
2015                // Apply body mutation if present
2016                if let Some(ref mutation) = decision.request_body_mutation {
2017                    if !mutation.is_pass_through() {
2018                        if mutation.is_drop() {
2019                            // Drop the chunk
2020                            *body = None;
2021                            trace!(
2022                                correlation_id = %ctx.trace_id,
2023                                chunk_index = chunk_index,
2024                                "Agent dropped body chunk"
2025                            );
2026                        } else if let Some(ref new_data) = mutation.data {
2027                            // Replace chunk with mutated content
2028                            *body = Some(Bytes::from(new_data.clone()));
2029                            trace!(
2030                                correlation_id = %ctx.trace_id,
2031                                chunk_index = chunk_index,
2032                                original_size = chunk_data.len(),
2033                                new_size = new_data.len(),
2034                                "Agent mutated body chunk"
2035                            );
2036                        }
2037                    }
2038                }
2039
2040                // Check decision (only final if needs_more is false)
2041                if !decision.needs_more && !decision.is_allow() {
2042                    warn!(
2043                        correlation_id = %ctx.trace_id,
2044                        action = ?decision.action,
2045                        "Agent blocked request body"
2046                    );
2047                    self.metrics.record_blocked_request("agent_body_inspection");
2048
2049                    let (status, message) = match &decision.action {
2050                        crate::agents::AgentAction::Block { status, body, .. } => (
2051                            *status,
2052                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
2053                        ),
2054                        _ => (403, "Forbidden".to_string()),
2055                    };
2056
2057                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2058                }
2059
2060                trace!(
2061                    correlation_id = %ctx.trace_id,
2062                    needs_more = decision.needs_more,
2063                    "Agent processed body chunk"
2064                );
2065            }
2066            Err(e) => {
2067                let fail_closed = ctx
2068                    .route_config
2069                    .as_ref()
2070                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2071                    .unwrap_or(false);
2072
2073                if fail_closed {
2074                    error!(
2075                        correlation_id = %ctx.trace_id,
2076                        error = %e,
2077                        "Agent streaming body inspection failed, blocking (fail-closed)"
2078                    );
2079                    return Err(Error::explain(
2080                        ErrorType::HTTPStatus(503),
2081                        "Service unavailable",
2082                    ));
2083                } else {
2084                    warn!(
2085                        correlation_id = %ctx.trace_id,
2086                        error = %e,
2087                        "Agent streaming body inspection failed, allowing (fail-open)"
2088                    );
2089                }
2090            }
2091        }
2092
2093        Ok(())
2094    }
2095
2096    /// Send buffered body to agents (buffer mode).
2097    async fn send_buffered_body_to_agents(
2098        &self,
2099        end_of_stream: bool,
2100        ctx: &mut RequestContext,
2101    ) -> Result<(), Box<Error>> {
2102        debug!(
2103            correlation_id = %ctx.trace_id,
2104            buffer_size = ctx.body_buffer.len(),
2105            end_of_stream = end_of_stream,
2106            agent_count = ctx.body_inspection_agents.len(),
2107            "Sending buffered body to agents for inspection"
2108        );
2109
2110        let agent_ctx = crate::agents::AgentCallContext {
2111            correlation_id: sentinel_common::CorrelationId::from_string(&ctx.trace_id),
2112            metadata: sentinel_agent_protocol::RequestMetadata {
2113                correlation_id: ctx.trace_id.clone(),
2114                request_id: ctx.trace_id.clone(),
2115                client_ip: ctx.client_ip.clone(),
2116                client_port: 0,
2117                server_name: ctx.host.clone(),
2118                protocol: "HTTP/1.1".to_string(),
2119                tls_version: None,
2120                tls_cipher: None,
2121                route_id: ctx.route_id.clone(),
2122                upstream_id: ctx.upstream.clone(),
2123                timestamp: chrono::Utc::now().to_rfc3339(),
2124            },
2125            route_id: ctx.route_id.clone(),
2126            upstream_id: ctx.upstream.clone(),
2127            request_body: Some(ctx.body_buffer.clone()),
2128            response_body: None,
2129        };
2130
2131        let agent_ids = ctx.body_inspection_agents.clone();
2132        match self
2133            .agent_manager
2134            .process_request_body(&agent_ctx, &ctx.body_buffer, end_of_stream, &agent_ids)
2135            .await
2136        {
2137            Ok(decision) => {
2138                if !decision.is_allow() {
2139                    warn!(
2140                        correlation_id = %ctx.trace_id,
2141                        action = ?decision.action,
2142                        "Agent blocked request body"
2143                    );
2144                    self.metrics.record_blocked_request("agent_body_inspection");
2145
2146                    let (status, message) = match &decision.action {
2147                        crate::agents::AgentAction::Block { status, body, .. } => (
2148                            *status,
2149                            body.clone().unwrap_or_else(|| "Blocked".to_string()),
2150                        ),
2151                        _ => (403, "Forbidden".to_string()),
2152                    };
2153
2154                    return Err(Error::explain(ErrorType::HTTPStatus(status), message));
2155                }
2156
2157                trace!(
2158                    correlation_id = %ctx.trace_id,
2159                    "Agent allowed request body"
2160                );
2161            }
2162            Err(e) => {
2163                let fail_closed = ctx
2164                    .route_config
2165                    .as_ref()
2166                    .map(|r| r.policies.failure_mode == sentinel_config::FailureMode::Closed)
2167                    .unwrap_or(false);
2168
2169                if fail_closed {
2170                    error!(
2171                        correlation_id = %ctx.trace_id,
2172                        error = %e,
2173                        "Agent body inspection failed, blocking (fail-closed)"
2174                    );
2175                    return Err(Error::explain(
2176                        ErrorType::HTTPStatus(503),
2177                        "Service unavailable",
2178                    ));
2179                } else {
2180                    warn!(
2181                        correlation_id = %ctx.trace_id,
2182                        error = %e,
2183                        "Agent body inspection failed, allowing (fail-open)"
2184                    );
2185                }
2186            }
2187        }
2188
2189        Ok(())
2190    }
2191}