1use async_trait::async_trait;
7use bytes::Bytes;
8use pingora::http::ResponseHeader;
9use pingora::prelude::*;
10use pingora::protocols::Digest;
11use pingora::proxy::{ProxyHttp, Session};
12use pingora::upstreams::peer::Peer;
13use pingora_cache::{
14 CacheKey, CacheMeta, ForcedFreshness, HitHandler, NoCacheReason, RespCacheable,
15};
16use pingora_timeout::sleep;
17use std::os::unix::io::RawFd;
18use std::time::Duration;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::cache::{get_cache_eviction, get_cache_lock, get_cache_storage};
22use crate::disk_cache::DiskHitHandler;
23use crate::hybrid_cache::HybridHitHandler;
24use crate::inference::{
25 extract_inference_content, is_sse_response, PromptInjectionResult, StreamingTokenCounter,
26};
27use crate::logging::{AccessLogEntry, AuditEventType, AuditLogEntry};
28use crate::rate_limit::HeaderAccessor;
29use crate::routing::RequestInfo;
30
31use super::context::{FallbackReason, RequestContext};
32use super::fallback::FallbackEvaluator;
33use super::fallback_metrics::get_fallback_metrics;
34use super::model_routing;
35use super::model_routing_metrics::get_model_routing_metrics;
36use super::GrapsusProxy;
37
38struct NoHeaderAccessor;
40impl HeaderAccessor for NoHeaderAccessor {
41 fn get_header(&self, _name: &str) -> Option<String> {
42 None
43 }
44}
45
46#[async_trait]
47impl ProxyHttp for GrapsusProxy {
48 type CTX = RequestContext;
49
50 fn new_ctx(&self) -> Self::CTX {
51 RequestContext::new()
52 }
53
54 fn fail_to_connect(
55 &self,
56 _session: &mut Session,
57 peer: &HttpPeer,
58 ctx: &mut Self::CTX,
59 e: Box<Error>,
60 ) -> Box<Error> {
61 error!(
62 correlation_id = %ctx.trace_id,
63 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
64 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
65 peer_address = %peer.address(),
66 error = %e,
67 "Failed to connect to upstream peer"
68 );
69 self.log_manager.log_request_error(
70 "error",
71 "Failed to connect to upstream peer",
72 &ctx.trace_id,
73 ctx.route_id.as_deref(),
74 ctx.upstream.as_deref(),
75 Some(format!("peer={} error={}", peer.address(), e)),
76 );
77 e
79 }
80
81 async fn early_request_filter(
84 &self,
85 session: &mut Session,
86 ctx: &mut Self::CTX,
87 ) -> Result<(), Box<Error>> {
88 let req_header = session.req_header();
90 let method = req_header.method.as_str();
91 let path = req_header.uri.path();
92 let host = req_header
93 .headers
94 .get("host")
95 .and_then(|h| h.to_str().ok())
96 .unwrap_or("");
97
98 if let Some(ref challenge_manager) = self.acme_challenges {
100 if let Some(token) = crate::acme::ChallengeManager::extract_token(path) {
101 if let Some(key_authorization) = challenge_manager.get_response(token) {
102 debug!(
103 token = %token,
104 "Serving ACME HTTP-01 challenge response"
105 );
106
107 let mut resp = ResponseHeader::build(200, None)?;
109 resp.insert_header("Content-Type", "text/plain")?;
110 resp.insert_header("Content-Length", key_authorization.len().to_string())?;
111
112 session.write_response_header(Box::new(resp), false).await?;
114 session
115 .write_response_body(Some(Bytes::from(key_authorization)), true)
116 .await?;
117
118 return Err(Error::explain(
120 ErrorType::InternalError,
121 "ACME challenge served",
122 ));
123 } else {
124 warn!(
126 token = %token,
127 "ACME challenge token not found"
128 );
129 }
130 }
131 }
132
133 ctx.method = method.to_string();
134 ctx.path = path.to_string();
135 ctx.host = Some(host.to_string());
136
137 let route_match = {
139 let route_matcher = self.route_matcher.read();
140 let request_info = RequestInfo::new(method, path, host);
141 match route_matcher.match_request(&request_info) {
142 Some(m) => m,
143 None => return Ok(()), }
145 };
146
147 ctx.trace_id = self.get_trace_id(session);
148 ctx.route_id = Some(route_match.route_id.to_string());
149 ctx.route_config = Some(route_match.config.clone());
150
151 if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
153 if let Ok(s) = traceparent.to_str() {
154 ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
155 }
156 }
157
158 if let Some(tracer) = crate::otel::get_tracer() {
160 ctx.otel_span = Some(tracer.start_span(method, path, ctx.trace_context.as_ref()));
161 }
162
163 if route_match.config.service_type == grapsus_config::ServiceType::Builtin {
165 trace!(
166 correlation_id = %ctx.trace_id,
167 route_id = %route_match.route_id,
168 builtin_handler = ?route_match.config.builtin_handler,
169 "Handling builtin route in early_request_filter"
170 );
171
172 let handled = self
174 .handle_builtin_route(session, ctx, &route_match)
175 .await?;
176
177 if handled {
178 return Err(Error::explain(
180 ErrorType::InternalError,
181 "Builtin handler complete",
182 ));
183 }
184 }
185
186 Ok(())
187 }
188
189 async fn upstream_peer(
190 &self,
191 session: &mut Session,
192 ctx: &mut Self::CTX,
193 ) -> Result<Box<HttpPeer>, Box<Error>> {
194 self.reload_coordinator.inc_requests();
196
197 if ctx.config.is_none() {
199 ctx.config = Some(self.config_manager.current());
200 }
201
202 if ctx.client_ip.is_empty() {
204 ctx.client_ip = session
205 .client_addr()
206 .map(|a| a.to_string())
207 .unwrap_or_else(|| "unknown".to_string());
208 }
209
210 let req_header = session.req_header();
211
212 if ctx.method.is_empty() {
214 ctx.method = req_header.method.to_string();
215 ctx.path = req_header.uri.path().to_string();
216 ctx.query = req_header.uri.query().map(|q| q.to_string());
217 ctx.host = req_header
218 .headers
219 .get("host")
220 .and_then(|v| v.to_str().ok())
221 .map(|s| s.to_string());
222 }
223 ctx.user_agent = req_header
224 .headers
225 .get("user-agent")
226 .and_then(|v| v.to_str().ok())
227 .map(|s| s.to_string());
228 ctx.referer = req_header
229 .headers
230 .get("referer")
231 .and_then(|v| v.to_str().ok())
232 .map(|s| s.to_string());
233
234 trace!(
235 correlation_id = %ctx.trace_id,
236 client_ip = %ctx.client_ip,
237 "Request received, initializing context"
238 );
239
240 let route_match = if let Some(ref route_config) = ctx.route_config {
242 let route_id = ctx.route_id.as_deref().unwrap_or("");
243 crate::routing::RouteMatch {
244 route_id: grapsus_common::RouteId::new(route_id),
245 config: route_config.clone(),
246 }
247 } else {
248 let (match_result, route_duration) = {
250 let route_matcher = self.route_matcher.read();
251 let host = ctx.host.as_deref().unwrap_or("");
252
253 let mut request_info = RequestInfo::new(&ctx.method, &ctx.path, host);
255
256 if route_matcher.needs_headers() {
258 request_info = request_info
259 .with_headers(RequestInfo::build_headers(req_header.headers.iter()));
260 }
261
262 if route_matcher.needs_query_params() {
264 request_info =
265 request_info.with_query_params(RequestInfo::parse_query_params(&ctx.path));
266 }
267
268 trace!(
269 correlation_id = %ctx.trace_id,
270 method = %request_info.method,
271 path = %request_info.path,
272 host = %request_info.host,
273 "Built request info for route matching"
274 );
275
276 let route_start = std::time::Instant::now();
277 let route_match = route_matcher.match_request(&request_info).ok_or_else(|| {
278 warn!(
279 correlation_id = %ctx.trace_id,
280 method = %request_info.method,
281 path = %request_info.path,
282 host = %request_info.host,
283 "No matching route found for request"
284 );
285 self.log_manager.log_request_error(
286 "warn",
287 "No matching route found for request",
288 &ctx.trace_id,
289 None,
290 None,
291 Some(format!(
292 "method={} path={} host={}",
293 request_info.method, request_info.path, request_info.host
294 )),
295 );
296 Error::explain(ErrorType::InternalError, "No matching route found")
297 })?;
298 let route_duration = route_start.elapsed();
299 (route_match, route_duration)
301 };
302
303 ctx.route_id = Some(match_result.route_id.to_string());
304 ctx.route_config = Some(match_result.config.clone());
305
306 if ctx.trace_id.is_empty() {
308 ctx.trace_id = self.get_trace_id(session);
309
310 if let Some(traceparent) = req_header.headers.get(crate::otel::TRACEPARENT_HEADER) {
312 if let Ok(s) = traceparent.to_str() {
313 ctx.trace_context = crate::otel::TraceContext::parse_traceparent(s);
314 }
315 }
316
317 if let Some(tracer) = crate::otel::get_tracer() {
319 ctx.otel_span =
320 Some(tracer.start_span(&ctx.method, &ctx.path, ctx.trace_context.as_ref()));
321 }
322 }
323
324 trace!(
325 correlation_id = %ctx.trace_id,
326 route_id = %match_result.route_id,
327 route_duration_us = route_duration.as_micros(),
328 service_type = ?match_result.config.service_type,
329 "Route matched"
330 );
331 match_result
332 };
333
334 if route_match.config.service_type == grapsus_config::ServiceType::Builtin {
336 trace!(
337 correlation_id = %ctx.trace_id,
338 route_id = %route_match.route_id,
339 builtin_handler = ?route_match.config.builtin_handler,
340 "Route type is builtin, skipping upstream"
341 );
342 ctx.upstream = Some(format!("_builtin_{}", route_match.route_id));
344 return Err(Error::explain(
346 ErrorType::InternalError,
347 "Builtin handler handled in request_filter",
348 ));
349 }
350
351 if route_match.config.service_type == grapsus_config::ServiceType::Static {
353 trace!(
354 correlation_id = %ctx.trace_id,
355 route_id = %route_match.route_id,
356 "Route type is static, checking for static server"
357 );
358 if self
360 .static_servers
361 .get(route_match.route_id.as_str())
362 .await
363 .is_some()
364 {
365 ctx.upstream = Some(format!("_static_{}", route_match.route_id));
367 info!(
368 correlation_id = %ctx.trace_id,
369 route_id = %route_match.route_id,
370 path = %ctx.path,
371 "Serving static file"
372 );
373 return Err(Error::explain(
375 ErrorType::InternalError,
376 "Static file serving handled in request_filter",
377 ));
378 }
379 }
380
381 let mut model_routing_applied = false;
384 if let Some(ref inference) = route_match.config.inference {
385 if let Some(ref model_routing) = inference.model_routing {
386 let model = model_routing::extract_model_from_headers(&req_header.headers);
388
389 if let Some(ref model_name) = model {
390 if let Some(routing_result) =
392 model_routing::find_upstream_for_model(model_routing, model_name)
393 {
394 debug!(
395 correlation_id = %ctx.trace_id,
396 route_id = %route_match.route_id,
397 model = %model_name,
398 upstream = %routing_result.upstream,
399 is_default = routing_result.is_default,
400 provider_override = ?routing_result.provider,
401 "Model-based routing selected upstream"
402 );
403
404 ctx.record_model_routing(
405 &routing_result.upstream,
406 Some(model_name.clone()),
407 routing_result.provider,
408 );
409 model_routing_applied = true;
410
411 if let Some(metrics) = get_model_routing_metrics() {
413 metrics.record_model_routed(
414 route_match.route_id.as_str(),
415 model_name,
416 &routing_result.upstream,
417 );
418 if routing_result.is_default {
419 metrics.record_default_upstream(route_match.route_id.as_str());
420 }
421 if let Some(provider) = routing_result.provider {
422 metrics.record_provider_override(
423 route_match.route_id.as_str(),
424 &routing_result.upstream,
425 provider.as_str(),
426 );
427 }
428 }
429 }
430 } else if let Some(ref default_upstream) = model_routing.default_upstream {
431 debug!(
433 correlation_id = %ctx.trace_id,
434 route_id = %route_match.route_id,
435 upstream = %default_upstream,
436 "Model-based routing using default upstream (no model header)"
437 );
438 ctx.record_model_routing(default_upstream, None, None);
439 model_routing_applied = true;
440
441 if let Some(metrics) = get_model_routing_metrics() {
443 metrics.record_no_model_header(route_match.route_id.as_str());
444 }
445 }
446 }
447 }
448
449 if !model_routing_applied {
451 if let Some(ref upstream) = route_match.config.upstream {
452 ctx.upstream = Some(upstream.clone());
453 trace!(
454 correlation_id = %ctx.trace_id,
455 route_id = %route_match.route_id,
456 upstream = %upstream,
457 "Upstream configured for route"
458 );
459 } else {
460 error!(
461 correlation_id = %ctx.trace_id,
462 route_id = %route_match.route_id,
463 "Route has no upstream configured"
464 );
465 return Err(Error::explain(
466 ErrorType::InternalError,
467 format!(
468 "Route '{}' has no upstream configured",
469 route_match.route_id
470 ),
471 ));
472 }
473 }
474
475 if let Some(ref fallback_config) = route_match.config.fallback {
478 let upstream_name = ctx.upstream.as_ref().unwrap();
479
480 let is_healthy = if let Some(pool) = self.upstream_pools.get(upstream_name).await {
482 pool.has_healthy_targets().await
483 } else {
484 false };
486
487 let is_budget_exhausted = ctx.inference_budget_exhausted;
489
490 let current_model = ctx.inference_model.as_deref();
492
493 let evaluator = FallbackEvaluator::new(
495 fallback_config,
496 ctx.tried_upstreams(),
497 ctx.fallback_attempt,
498 );
499
500 if let Some(decision) = evaluator.should_fallback_before_request(
502 upstream_name,
503 is_healthy,
504 is_budget_exhausted,
505 current_model,
506 ) {
507 info!(
508 correlation_id = %ctx.trace_id,
509 route_id = %route_match.route_id,
510 from_upstream = %upstream_name,
511 to_upstream = %decision.next_upstream,
512 reason = %decision.reason,
513 fallback_attempt = ctx.fallback_attempt + 1,
514 "Triggering fallback routing"
515 );
516
517 if let Some(metrics) = get_fallback_metrics() {
519 metrics.record_fallback_attempt(
520 route_match.route_id.as_str(),
521 upstream_name,
522 &decision.next_upstream,
523 &decision.reason,
524 );
525 }
526
527 ctx.record_fallback(decision.reason, &decision.next_upstream);
529
530 if let Some((original, mapped)) = decision.model_mapping {
532 if let Some(metrics) = get_fallback_metrics() {
534 metrics.record_model_mapping(
535 route_match.route_id.as_str(),
536 &original,
537 &mapped,
538 );
539 }
540
541 ctx.record_model_mapping(original, mapped);
542 trace!(
543 correlation_id = %ctx.trace_id,
544 original_model = ?ctx.model_mapping_applied().map(|m| &m.0),
545 mapped_model = ?ctx.model_mapping_applied().map(|m| &m.1),
546 "Applied model mapping for fallback"
547 );
548 }
549 }
550 }
551
552 debug!(
553 correlation_id = %ctx.trace_id,
554 route_id = %route_match.route_id,
555 upstream = ?ctx.upstream,
556 method = %req_header.method,
557 path = %req_header.uri.path(),
558 host = ctx.host.as_deref().unwrap_or("-"),
559 client_ip = %ctx.client_ip,
560 "Processing request"
561 );
562
563 if ctx
565 .upstream
566 .as_ref()
567 .is_some_and(|u| u.starts_with("_static_"))
568 {
569 return Err(Error::explain(
571 ErrorType::InternalError,
572 "Static route should be handled in request_filter",
573 ));
574 }
575
576 let upstream_name = ctx
577 .upstream
578 .as_ref()
579 .ok_or_else(|| Error::explain(ErrorType::InternalError, "No upstream configured"))?;
580
581 trace!(
582 correlation_id = %ctx.trace_id,
583 upstream = %upstream_name,
584 "Looking up upstream pool"
585 );
586
587 let pool = self
588 .upstream_pools
589 .get(upstream_name)
590 .await
591 .ok_or_else(|| {
592 error!(
593 correlation_id = %ctx.trace_id,
594 upstream = %upstream_name,
595 "Upstream pool not found"
596 );
597 self.log_manager.log_request_error(
598 "error",
599 "Upstream pool not found",
600 &ctx.trace_id,
601 ctx.route_id.as_deref(),
602 Some(upstream_name),
603 None,
604 );
605 Error::explain(
606 ErrorType::InternalError,
607 format!("Upstream pool '{}' not found", upstream_name),
608 )
609 })?;
610
611 let max_retries = route_match
613 .config
614 .retry_policy
615 .as_ref()
616 .map(|r| r.max_attempts)
617 .unwrap_or(1);
618
619 trace!(
620 correlation_id = %ctx.trace_id,
621 upstream = %upstream_name,
622 max_retries = max_retries,
623 "Starting upstream peer selection"
624 );
625
626 let mut last_error = None;
627 let selection_start = std::time::Instant::now();
628
629 for attempt in 1..=max_retries {
630 ctx.upstream_attempts = attempt;
631
632 trace!(
633 correlation_id = %ctx.trace_id,
634 upstream = %upstream_name,
635 attempt = attempt,
636 max_retries = max_retries,
637 "Attempting to select upstream peer"
638 );
639
640 match pool.select_peer_with_metadata(None).await {
641 Ok((mut peer, metadata)) => {
642 let selection_duration = selection_start.elapsed();
643 let peer_addr = peer.address().to_string();
645 ctx.selected_upstream_address = Some(peer_addr.clone());
646
647 if metadata.contains_key("sticky_session_new") {
649 ctx.sticky_session_new_assignment = true;
650 ctx.sticky_session_set_cookie =
651 metadata.get("sticky_set_cookie_header").cloned();
652 ctx.sticky_target_index = metadata
653 .get("sticky_target_index")
654 .and_then(|s| s.parse().ok());
655
656 trace!(
657 correlation_id = %ctx.trace_id,
658 sticky_target_index = ?ctx.sticky_target_index,
659 "New sticky session assignment, will set cookie"
660 );
661 }
662
663 debug!(
664 correlation_id = %ctx.trace_id,
665 upstream = %upstream_name,
666 peer_address = %peer_addr,
667 attempt = attempt,
668 selection_duration_us = selection_duration.as_micros(),
669 sticky_session_hit = metadata.contains_key("sticky_session_hit"),
670 sticky_session_new = ctx.sticky_session_new_assignment,
671 "Selected upstream peer"
672 );
673 if let Some(ref rc) = ctx.route_config {
675 if let Some(timeout_secs) = rc.policies.timeout_secs {
676 peer.options.read_timeout = Some(Duration::from_secs(timeout_secs));
677 }
678 }
679
680 if let Some(connect_secs) = ctx.filter_connect_timeout_secs {
682 peer.options.connection_timeout = Some(Duration::from_secs(connect_secs));
683 }
684 if let Some(upstream_secs) = ctx.filter_upstream_timeout_secs {
685 peer.options.read_timeout = Some(Duration::from_secs(upstream_secs));
686 }
687
688 return Ok(Box::new(peer));
689 }
690 Err(e) => {
691 warn!(
692 correlation_id = %ctx.trace_id,
693 upstream = %upstream_name,
694 attempt = attempt,
695 max_retries = max_retries,
696 error = %e,
697 "Failed to select upstream peer"
698 );
699 last_error = Some(e);
700
701 if attempt < max_retries {
702 let backoff = Duration::from_millis(100 * 2_u64.pow(attempt - 1));
704 trace!(
705 correlation_id = %ctx.trace_id,
706 backoff_ms = backoff.as_millis(),
707 "Backing off before retry"
708 );
709 sleep(backoff).await;
710 }
711 }
712 }
713 }
714
715 let selection_duration = selection_start.elapsed();
716 error!(
717 correlation_id = %ctx.trace_id,
718 upstream = %upstream_name,
719 attempts = max_retries,
720 selection_duration_ms = selection_duration.as_millis(),
721 last_error = ?last_error,
722 "All upstream selection attempts failed"
723 );
724 self.log_manager.log_request_error(
725 "error",
726 "All upstream selection attempts failed",
727 &ctx.trace_id,
728 ctx.route_id.as_deref(),
729 Some(upstream_name),
730 Some(format!("attempts={} error={:?}", max_retries, last_error)),
731 );
732
733 if ctx.used_fallback() {
735 if let Some(metrics) = get_fallback_metrics() {
736 metrics.record_fallback_exhausted(ctx.route_id.as_deref().unwrap_or("unknown"));
737 }
738 }
739
740 Err(Error::explain(
741 ErrorType::InternalError,
742 format!("All upstream attempts failed: {:?}", last_error),
743 ))
744 }
745
746 async fn request_filter(
747 &self,
748 session: &mut Session,
749 ctx: &mut Self::CTX,
750 ) -> Result<bool, Box<Error>> {
751 trace!(
752 correlation_id = %ctx.trace_id,
753 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
754 "Starting request filter phase"
755 );
756
757 if let Some(server_addr) = session.downstream_session.server_addr() {
759 let server_addr_str = server_addr.to_string();
760 let config = ctx
761 .config
762 .get_or_insert_with(|| self.config_manager.current());
763 for listener in &config.listeners {
764 if listener.address == server_addr_str {
765 session.downstream_session.set_read_timeout(Some(
767 std::time::Duration::from_secs(listener.request_timeout_secs),
768 ));
769 ctx.listener_keepalive_timeout_secs = Some(listener.keepalive_timeout_secs);
771 break;
772 }
773 }
774 }
775
776 if let Some(route_id) = ctx.route_id.as_deref() {
779 if self.rate_limit_manager.has_route_limiter(route_id) {
780 let rate_result = self.rate_limit_manager.check(
781 route_id,
782 &ctx.client_ip,
783 &ctx.path,
784 Option::<&NoHeaderAccessor>::None,
785 );
786
787 if rate_result.limit > 0 {
789 ctx.rate_limit_info = Some(super::context::RateLimitHeaderInfo {
790 limit: rate_result.limit,
791 remaining: rate_result.remaining,
792 reset_at: rate_result.reset_at,
793 });
794 }
795
796 if !rate_result.allowed {
797 use grapsus_config::RateLimitAction;
798
799 match rate_result.action {
800 RateLimitAction::Reject => {
801 warn!(
802 correlation_id = %ctx.trace_id,
803 route_id = route_id,
804 client_ip = %ctx.client_ip,
805 limiter = %rate_result.limiter,
806 limit = rate_result.limit,
807 remaining = rate_result.remaining,
808 "Request rate limited"
809 );
810 self.metrics.record_blocked_request("rate_limited");
811
812 let audit_entry = AuditLogEntry::rate_limited(
814 &ctx.trace_id,
815 &ctx.method,
816 &ctx.path,
817 &ctx.client_ip,
818 &rate_result.limiter,
819 )
820 .with_route_id(route_id)
821 .with_status_code(rate_result.status_code);
822 self.log_manager.log_audit(&audit_entry);
823
824 let body = rate_result
826 .message
827 .unwrap_or_else(|| "Rate limit exceeded".to_string());
828
829 let retry_after = rate_result.reset_at.saturating_sub(
831 std::time::SystemTime::now()
832 .duration_since(std::time::UNIX_EPOCH)
833 .unwrap_or_default()
834 .as_secs(),
835 );
836 crate::http_helpers::write_rate_limit_error(
837 session,
838 rate_result.status_code,
839 &body,
840 rate_result.limit,
841 rate_result.remaining,
842 rate_result.reset_at,
843 retry_after,
844 )
845 .await?;
846 return Ok(true); }
848 RateLimitAction::LogOnly => {
849 debug!(
850 correlation_id = %ctx.trace_id,
851 route_id = route_id,
852 "Rate limit exceeded (log only mode)"
853 );
854 }
856 RateLimitAction::Delay => {
857 if let Some(delay_ms) = rate_result.suggested_delay_ms {
859 let actual_delay = delay_ms.min(rate_result.max_delay_ms);
861
862 if actual_delay > 0 {
863 debug!(
864 correlation_id = %ctx.trace_id,
865 route_id = route_id,
866 suggested_delay_ms = delay_ms,
867 max_delay_ms = rate_result.max_delay_ms,
868 actual_delay_ms = actual_delay,
869 "Applying rate limit delay"
870 );
871
872 tokio::time::sleep(std::time::Duration::from_millis(
873 actual_delay,
874 ))
875 .await;
876 }
877 }
878 }
880 }
881 }
882 }
883 }
884
885 if let Some(route_id) = ctx.route_id.as_deref() {
888 if let Some(ref route_config) = ctx.route_config {
889 if route_config.service_type == grapsus_config::ServiceType::Inference
890 && self.inference_rate_limit_manager.has_route(route_id)
891 {
892 let headers = &session.req_header().headers;
895
896 let body = ctx.body_buffer.as_slice();
898
899 let rate_limit_key = &ctx.client_ip;
901
902 if let Some(check_result) = self.inference_rate_limit_manager.check(
903 route_id,
904 rate_limit_key,
905 headers,
906 body,
907 ) {
908 ctx.inference_rate_limit_enabled = true;
910 ctx.inference_estimated_tokens = check_result.estimated_tokens;
911 ctx.inference_rate_limit_key = Some(rate_limit_key.to_string());
912 ctx.inference_model = check_result.model.clone();
913
914 if !check_result.is_allowed() {
915 let retry_after_ms = check_result.retry_after_ms();
916 let retry_after_secs = retry_after_ms.div_ceil(1000);
917
918 warn!(
919 correlation_id = %ctx.trace_id,
920 route_id = route_id,
921 client_ip = %ctx.client_ip,
922 estimated_tokens = check_result.estimated_tokens,
923 model = ?check_result.model,
924 retry_after_ms = retry_after_ms,
925 "Inference rate limit exceeded (tokens)"
926 );
927 self.metrics
928 .record_blocked_request("inference_rate_limited");
929
930 let audit_entry = AuditLogEntry::new(
932 &ctx.trace_id,
933 AuditEventType::RateLimitExceeded,
934 &ctx.method,
935 &ctx.path,
936 &ctx.client_ip,
937 )
938 .with_route_id(route_id)
939 .with_status_code(429)
940 .with_reason(format!(
941 "Token rate limit exceeded: estimated {} tokens, model={:?}",
942 check_result.estimated_tokens, check_result.model
943 ));
944 self.log_manager.log_audit(&audit_entry);
945
946 let body = "Token rate limit exceeded";
948 let reset_at = std::time::SystemTime::now()
949 .duration_since(std::time::UNIX_EPOCH)
950 .unwrap_or_default()
951 .as_secs()
952 + retry_after_secs;
953
954 crate::http_helpers::write_rate_limit_error(
956 session,
957 429,
958 body,
959 0, 0, reset_at,
962 retry_after_secs,
963 )
964 .await?;
965 return Ok(true); }
967
968 trace!(
969 correlation_id = %ctx.trace_id,
970 route_id = route_id,
971 estimated_tokens = check_result.estimated_tokens,
972 model = ?check_result.model,
973 "Inference rate limit check passed"
974 );
975
976 if self.inference_rate_limit_manager.has_budget(route_id) {
978 ctx.inference_budget_enabled = true;
979
980 if let Some(budget_result) =
981 self.inference_rate_limit_manager.check_budget(
982 route_id,
983 rate_limit_key,
984 check_result.estimated_tokens,
985 )
986 {
987 if !budget_result.is_allowed() {
988 let retry_after_secs = budget_result.retry_after_secs();
989
990 warn!(
991 correlation_id = %ctx.trace_id,
992 route_id = route_id,
993 client_ip = %ctx.client_ip,
994 estimated_tokens = check_result.estimated_tokens,
995 retry_after_secs = retry_after_secs,
996 "Token budget exhausted"
997 );
998
999 ctx.inference_budget_exhausted = true;
1000 self.metrics.record_blocked_request("budget_exhausted");
1001
1002 let audit_entry = AuditLogEntry::new(
1004 &ctx.trace_id,
1005 AuditEventType::RateLimitExceeded,
1006 &ctx.method,
1007 &ctx.path,
1008 &ctx.client_ip,
1009 )
1010 .with_route_id(route_id)
1011 .with_status_code(429)
1012 .with_reason("Token budget exhausted".to_string());
1013 self.log_manager.log_audit(&audit_entry);
1014
1015 let body = "Token budget exhausted";
1017 let reset_at = std::time::SystemTime::now()
1018 .duration_since(std::time::UNIX_EPOCH)
1019 .unwrap_or_default()
1020 .as_secs()
1021 + retry_after_secs;
1022
1023 crate::http_helpers::write_rate_limit_error(
1024 session,
1025 429,
1026 body,
1027 0,
1028 0,
1029 reset_at,
1030 retry_after_secs,
1031 )
1032 .await?;
1033 return Ok(true);
1034 }
1035
1036 let remaining = match &budget_result {
1038 grapsus_common::budget::BudgetCheckResult::Allowed {
1039 remaining,
1040 } => *remaining as i64,
1041 grapsus_common::budget::BudgetCheckResult::Soft {
1042 remaining,
1043 ..
1044 } => *remaining,
1045 _ => 0,
1046 };
1047 ctx.inference_budget_remaining = Some(remaining);
1048
1049 if let Some(status) = self
1051 .inference_rate_limit_manager
1052 .budget_status(route_id, rate_limit_key)
1053 {
1054 ctx.inference_budget_period_reset = Some(status.period_end);
1055 }
1056
1057 trace!(
1058 correlation_id = %ctx.trace_id,
1059 route_id = route_id,
1060 budget_remaining = remaining,
1061 "Token budget check passed"
1062 );
1063 }
1064 }
1065
1066 if self
1068 .inference_rate_limit_manager
1069 .has_cost_attribution(route_id)
1070 {
1071 ctx.inference_cost_enabled = true;
1072 }
1073 }
1074 }
1075 }
1076 }
1077
1078 if let Some(ref route_config) = ctx.route_config {
1080 if let Some(ref inference) = route_config.inference {
1081 if let Some(ref guardrails) = inference.guardrails {
1082 if let Some(ref pi_config) = guardrails.prompt_injection {
1083 if pi_config.enabled && !ctx.body_buffer.is_empty() {
1084 ctx.guardrails_enabled = true;
1085
1086 if let Some(content) = extract_inference_content(&ctx.body_buffer) {
1088 let result = self
1089 .guardrail_processor
1090 .check_prompt_injection(
1091 pi_config,
1092 &content,
1093 ctx.inference_model.as_deref(),
1094 ctx.route_id.as_deref(),
1095 &ctx.trace_id,
1096 )
1097 .await;
1098
1099 match result {
1100 PromptInjectionResult::Blocked {
1101 status,
1102 message,
1103 detections,
1104 } => {
1105 warn!(
1106 correlation_id = %ctx.trace_id,
1107 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1108 detection_count = detections.len(),
1109 "Prompt injection detected, blocking request"
1110 );
1111
1112 self.metrics.record_blocked_request("prompt_injection");
1113
1114 ctx.guardrail_detection_categories =
1116 detections.iter().map(|d| d.category.clone()).collect();
1117
1118 let audit_entry = AuditLogEntry::new(
1120 &ctx.trace_id,
1121 AuditEventType::Blocked,
1122 &ctx.method,
1123 &ctx.path,
1124 &ctx.client_ip,
1125 )
1126 .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
1127 .with_status_code(status)
1128 .with_reason("Prompt injection detected".to_string());
1129 self.log_manager.log_audit(&audit_entry);
1130
1131 crate::http_helpers::write_json_error(
1133 session,
1134 status,
1135 "prompt_injection_blocked",
1136 Some(&message),
1137 )
1138 .await?;
1139 return Ok(true);
1140 }
1141 PromptInjectionResult::Detected { detections } => {
1142 warn!(
1144 correlation_id = %ctx.trace_id,
1145 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1146 detection_count = detections.len(),
1147 "Prompt injection detected (logged only)"
1148 );
1149 ctx.guardrail_detection_categories =
1150 detections.iter().map(|d| d.category.clone()).collect();
1151 }
1152 PromptInjectionResult::Warning { detections } => {
1153 ctx.guardrail_warning = true;
1155 ctx.guardrail_detection_categories =
1156 detections.iter().map(|d| d.category.clone()).collect();
1157 debug!(
1158 correlation_id = %ctx.trace_id,
1159 "Prompt injection warning set"
1160 );
1161 }
1162 PromptInjectionResult::Clean => {
1163 trace!(
1164 correlation_id = %ctx.trace_id,
1165 "No prompt injection detected"
1166 );
1167 }
1168 PromptInjectionResult::Error { message } => {
1169 trace!(
1171 correlation_id = %ctx.trace_id,
1172 error = %message,
1173 "Prompt injection check error (failure mode applied)"
1174 );
1175 }
1176 }
1177 }
1178 }
1179 }
1180 }
1181 }
1182 }
1183
1184 if let Some(route_id) = ctx.route_id.as_deref() {
1186 if let Some(ref route_config) = ctx.route_config {
1187 for filter_id in &route_config.filters {
1188 if let Some(result) = self.geo_filter_manager.check(filter_id, &ctx.client_ip) {
1189 ctx.geo_country_code = result.country_code.clone();
1191 ctx.geo_lookup_performed = true;
1192
1193 if !result.allowed {
1194 warn!(
1195 correlation_id = %ctx.trace_id,
1196 route_id = route_id,
1197 client_ip = %ctx.client_ip,
1198 country = ?result.country_code,
1199 filter_id = %filter_id,
1200 "Request blocked by geo filter"
1201 );
1202 self.metrics.record_blocked_request("geo_blocked");
1203
1204 let audit_entry = AuditLogEntry::new(
1206 &ctx.trace_id,
1207 AuditEventType::Blocked,
1208 &ctx.method,
1209 &ctx.path,
1210 &ctx.client_ip,
1211 )
1212 .with_route_id(route_id)
1213 .with_status_code(result.status_code)
1214 .with_reason(format!(
1215 "Geo blocked: country={}, filter={}",
1216 result.country_code.as_deref().unwrap_or("unknown"),
1217 filter_id
1218 ));
1219 self.log_manager.log_audit(&audit_entry);
1220
1221 let body = result
1223 .block_message
1224 .unwrap_or_else(|| "Access denied".to_string());
1225
1226 crate::http_helpers::write_error(
1227 session,
1228 result.status_code,
1229 &body,
1230 "text/plain",
1231 )
1232 .await?;
1233 return Ok(true); }
1235
1236 break;
1238 }
1239 }
1240 }
1241 }
1242
1243 let config_for_filters = std::sync::Arc::clone(
1246 ctx.config
1247 .get_or_insert_with(|| self.config_manager.current()),
1248 );
1249 if super::filters::apply_request_filters(session, ctx, &config_for_filters).await? {
1250 return Ok(true); }
1252
1253 let is_websocket_upgrade = session
1255 .req_header()
1256 .headers
1257 .get(http::header::UPGRADE)
1258 .map(|v| v.as_bytes().eq_ignore_ascii_case(b"websocket"))
1259 .unwrap_or(false);
1260
1261 if is_websocket_upgrade {
1262 ctx.is_websocket_upgrade = true;
1263
1264 if let Some(ref route_config) = ctx.route_config {
1266 if !route_config.websocket {
1267 warn!(
1268 correlation_id = %ctx.trace_id,
1269 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1270 client_ip = %ctx.client_ip,
1271 "WebSocket upgrade rejected: not enabled for route"
1272 );
1273
1274 self.metrics.record_blocked_request("websocket_not_enabled");
1275
1276 let audit_entry = AuditLogEntry::new(
1278 &ctx.trace_id,
1279 AuditEventType::Blocked,
1280 &ctx.method,
1281 &ctx.path,
1282 &ctx.client_ip,
1283 )
1284 .with_route_id(ctx.route_id.as_deref().unwrap_or("unknown"))
1285 .with_action("websocket_rejected")
1286 .with_reason("WebSocket not enabled for route");
1287 self.log_manager.log_audit(&audit_entry);
1288
1289 crate::http_helpers::write_error(
1291 session,
1292 403,
1293 "WebSocket not enabled for this route",
1294 "text/plain",
1295 )
1296 .await?;
1297 return Ok(true); }
1299
1300 debug!(
1301 correlation_id = %ctx.trace_id,
1302 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1303 "WebSocket upgrade request allowed"
1304 );
1305
1306 if route_config.websocket_inspection {
1308 let has_compression = session
1310 .req_header()
1311 .headers
1312 .get("Sec-WebSocket-Extensions")
1313 .and_then(|v| v.to_str().ok())
1314 .map(|s| s.contains("permessage-deflate"))
1315 .unwrap_or(false);
1316
1317 if has_compression {
1318 debug!(
1319 correlation_id = %ctx.trace_id,
1320 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1321 "WebSocket inspection skipped: permessage-deflate negotiated"
1322 );
1323 ctx.websocket_skip_inspection = true;
1324 } else {
1325 ctx.websocket_inspection_enabled = true;
1326
1327 ctx.websocket_inspection_agents = self.agent_manager.get_agents_for_event(
1329 grapsus_agent_protocol::EventType::WebSocketFrame,
1330 );
1331
1332 debug!(
1333 correlation_id = %ctx.trace_id,
1334 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1335 agent_count = ctx.websocket_inspection_agents.len(),
1336 "WebSocket frame inspection enabled"
1337 );
1338 }
1339 }
1340 }
1341 }
1342
1343 if let Some(route_config) = ctx.route_config.clone() {
1346 if route_config.service_type == grapsus_config::ServiceType::Static {
1347 trace!(
1348 correlation_id = %ctx.trace_id,
1349 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1350 "Handling static file route"
1351 );
1352 let route_match = crate::routing::RouteMatch {
1354 route_id: grapsus_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1355 config: route_config.clone(),
1356 };
1357 return self.handle_static_route(session, ctx, &route_match).await;
1358 } else if route_config.service_type == grapsus_config::ServiceType::Builtin {
1359 trace!(
1360 correlation_id = %ctx.trace_id,
1361 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1362 builtin_handler = ?route_config.builtin_handler,
1363 "Handling builtin route"
1364 );
1365 let route_match = crate::routing::RouteMatch {
1367 route_id: grapsus_common::RouteId::new(ctx.route_id.as_deref().unwrap_or("")),
1368 config: route_config.clone(),
1369 };
1370 return self.handle_builtin_route(session, ctx, &route_match).await;
1371 }
1372 }
1373
1374 if let Some(route_id) = ctx.route_id.clone() {
1376 if let Some(validator) = self.validators.get(&route_id).await {
1377 trace!(
1378 correlation_id = %ctx.trace_id,
1379 route_id = %route_id,
1380 "Running API schema validation"
1381 );
1382 if let Some(result) = self
1383 .validate_api_request(session, ctx, &route_id, &validator)
1384 .await?
1385 {
1386 debug!(
1387 correlation_id = %ctx.trace_id,
1388 route_id = %route_id,
1389 validation_passed = result,
1390 "API validation complete"
1391 );
1392 return Ok(result);
1393 }
1394 }
1395 }
1396
1397 let client_addr = session
1399 .client_addr()
1400 .map(|a| format!("{}", a))
1401 .unwrap_or_else(|| "unknown".to_string());
1402 let client_port = session.client_addr().map(|_| 0).unwrap_or(0);
1403
1404 let req_header = session.req_header_mut();
1405
1406 req_header
1408 .insert_header("X-Correlation-Id", &ctx.trace_id)
1409 .ok();
1410 req_header.insert_header("X-Forwarded-By", "Grapsus").ok();
1411
1412 let config = ctx
1414 .config
1415 .get_or_insert_with(|| self.config_manager.current());
1416
1417 const HEADER_LIMIT_THRESHOLD: usize = 1024 * 1024; let header_count = req_header.headers.len();
1422 if config.limits.max_header_count < HEADER_LIMIT_THRESHOLD
1423 && header_count > config.limits.max_header_count
1424 {
1425 warn!(
1426 correlation_id = %ctx.trace_id,
1427 header_count = header_count,
1428 limit = config.limits.max_header_count,
1429 "Request blocked: exceeds header count limit"
1430 );
1431
1432 self.metrics.record_blocked_request("header_count_exceeded");
1433 return Err(Error::explain(ErrorType::InternalError, "Too many headers"));
1434 }
1435
1436 if config.limits.max_header_size_bytes < HEADER_LIMIT_THRESHOLD {
1438 let total_header_size: usize = req_header
1439 .headers
1440 .iter()
1441 .map(|(k, v)| k.as_str().len() + v.len())
1442 .sum();
1443
1444 if total_header_size > config.limits.max_header_size_bytes {
1445 warn!(
1446 correlation_id = %ctx.trace_id,
1447 header_size = total_header_size,
1448 limit = config.limits.max_header_size_bytes,
1449 "Request blocked: exceeds header size limit"
1450 );
1451
1452 self.metrics.record_blocked_request("header_size_exceeded");
1453 return Err(Error::explain(
1454 ErrorType::InternalError,
1455 "Headers too large",
1456 ));
1457 }
1458 }
1459
1460 trace!(
1462 correlation_id = %ctx.trace_id,
1463 "Processing request through agents"
1464 );
1465 if let Err(e) = self
1466 .process_agents(session, ctx, &client_addr, client_port)
1467 .await
1468 {
1469 if let ErrorType::HTTPStatus(status) = e.etype() {
1472 let error_msg = e.to_string();
1474 let body = error_msg
1475 .split("context:")
1476 .nth(1)
1477 .map(|s| s.trim())
1478 .unwrap_or("Request blocked");
1479 debug!(
1480 correlation_id = %ctx.trace_id,
1481 status = status,
1482 body = %body,
1483 "Sending HTTP error response for agent block"
1484 );
1485 crate::http_helpers::write_error(session, *status, body, "text/plain").await?;
1486 return Ok(true); }
1488 return Err(e);
1490 }
1491
1492 trace!(
1493 correlation_id = %ctx.trace_id,
1494 "Request filter phase complete, forwarding to upstream"
1495 );
1496
1497 Ok(false) }
1499
1500 async fn request_body_filter(
1507 &self,
1508 _session: &mut Session,
1509 body: &mut Option<Bytes>,
1510 end_of_stream: bool,
1511 ctx: &mut Self::CTX,
1512 ) -> Result<(), Box<Error>> {
1513 use grapsus_config::BodyStreamingMode;
1514
1515 if ctx.is_websocket_upgrade {
1517 if let Some(ref handler) = ctx.websocket_handler {
1518 let result = handler.process_client_data(body.take()).await;
1519 match result {
1520 crate::websocket::ProcessResult::Forward(data) => {
1521 *body = data;
1522 }
1523 crate::websocket::ProcessResult::Close(reason) => {
1524 warn!(
1525 correlation_id = %ctx.trace_id,
1526 code = reason.code,
1527 reason = %reason.reason,
1528 "WebSocket connection closed by agent (client->server)"
1529 );
1530 return Err(Error::explain(
1532 ErrorType::InternalError,
1533 format!("WebSocket closed: {} {}", reason.code, reason.reason),
1534 ));
1535 }
1536 }
1537 }
1538 return Ok(());
1540 }
1541
1542 let chunk_len = body.as_ref().map(|b| b.len()).unwrap_or(0);
1544 if chunk_len > 0 {
1545 ctx.request_body_bytes += chunk_len as u64;
1546
1547 trace!(
1548 correlation_id = %ctx.trace_id,
1549 chunk_size = chunk_len,
1550 total_body_bytes = ctx.request_body_bytes,
1551 end_of_stream = end_of_stream,
1552 streaming_mode = ?ctx.request_body_streaming_mode,
1553 "Processing request body chunk"
1554 );
1555
1556 let config = ctx
1558 .config
1559 .get_or_insert_with(|| self.config_manager.current());
1560 if ctx.request_body_bytes > config.limits.max_body_size_bytes as u64 {
1561 warn!(
1562 correlation_id = %ctx.trace_id,
1563 body_bytes = ctx.request_body_bytes,
1564 limit = config.limits.max_body_size_bytes,
1565 "Request body size limit exceeded"
1566 );
1567 self.metrics.record_blocked_request("body_size_exceeded");
1568 return Err(Error::explain(
1569 ErrorType::InternalError,
1570 "Request body too large",
1571 ));
1572 }
1573 }
1574
1575 if ctx.body_inspection_enabled && !ctx.body_inspection_agents.is_empty() {
1577 let config = ctx
1578 .config
1579 .get_or_insert_with(|| self.config_manager.current());
1580 let max_inspection_bytes = config
1581 .waf
1582 .as_ref()
1583 .map(|w| w.body_inspection.max_inspection_bytes as u64)
1584 .unwrap_or(1024 * 1024);
1585
1586 match ctx.request_body_streaming_mode {
1587 BodyStreamingMode::Stream => {
1588 if body.is_some() {
1590 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1591 .await?;
1592 } else if end_of_stream && ctx.agent_needs_more {
1593 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1595 .await?;
1596 }
1597 }
1598 BodyStreamingMode::Hybrid { buffer_threshold } => {
1599 if ctx.body_bytes_inspected < buffer_threshold as u64 {
1601 if let Some(ref chunk) = body {
1603 let bytes_to_buffer = std::cmp::min(
1604 chunk.len(),
1605 (buffer_threshold as u64 - ctx.body_bytes_inspected) as usize,
1606 );
1607 ctx.body_buffer.extend_from_slice(&chunk[..bytes_to_buffer]);
1608 ctx.body_bytes_inspected += bytes_to_buffer as u64;
1609
1610 if ctx.body_bytes_inspected >= buffer_threshold as u64 || end_of_stream
1612 {
1613 self.send_buffered_body_to_agents(
1615 end_of_stream && chunk.len() == bytes_to_buffer,
1616 ctx,
1617 )
1618 .await?;
1619 ctx.body_buffer.clear();
1620
1621 if bytes_to_buffer < chunk.len() {
1623 let remaining = chunk.slice(bytes_to_buffer..);
1624 let mut remaining_body = Some(remaining);
1625 self.process_body_chunk_streaming(
1626 &mut remaining_body,
1627 end_of_stream,
1628 ctx,
1629 )
1630 .await?;
1631 }
1632 }
1633 }
1634 } else {
1635 self.process_body_chunk_streaming(body, end_of_stream, ctx)
1637 .await?;
1638 }
1639 }
1640 BodyStreamingMode::Buffer => {
1641 if let Some(ref chunk) = body {
1643 if ctx.body_bytes_inspected < max_inspection_bytes {
1644 let bytes_to_inspect = std::cmp::min(
1645 chunk.len() as u64,
1646 max_inspection_bytes - ctx.body_bytes_inspected,
1647 ) as usize;
1648
1649 ctx.body_buffer
1650 .extend_from_slice(&chunk[..bytes_to_inspect]);
1651 ctx.body_bytes_inspected += bytes_to_inspect as u64;
1652
1653 trace!(
1654 correlation_id = %ctx.trace_id,
1655 bytes_inspected = ctx.body_bytes_inspected,
1656 max_inspection_bytes = max_inspection_bytes,
1657 buffer_size = ctx.body_buffer.len(),
1658 "Buffering body for agent inspection"
1659 );
1660 }
1661 }
1662
1663 let should_send =
1665 end_of_stream || ctx.body_bytes_inspected >= max_inspection_bytes;
1666 if should_send && !ctx.body_buffer.is_empty() {
1667 self.send_buffered_body_to_agents(end_of_stream, ctx)
1668 .await?;
1669 ctx.body_buffer.clear();
1670 }
1671 }
1672 }
1673 }
1674
1675 if end_of_stream {
1676 trace!(
1677 correlation_id = %ctx.trace_id,
1678 total_body_bytes = ctx.request_body_bytes,
1679 bytes_inspected = ctx.body_bytes_inspected,
1680 "Request body complete"
1681 );
1682 }
1683
1684 Ok(())
1685 }
1686
1687 async fn response_filter(
1688 &self,
1689 session: &mut Session,
1690 upstream_response: &mut ResponseHeader,
1691 ctx: &mut Self::CTX,
1692 ) -> Result<(), Box<Error>> {
1693 let status = upstream_response.status.as_u16();
1694 let duration = ctx.elapsed();
1695
1696 trace!(
1697 correlation_id = %ctx.trace_id,
1698 status = status,
1699 "Starting response filter phase"
1700 );
1701
1702 if status == 101 && ctx.is_websocket_upgrade {
1704 if ctx.websocket_inspection_enabled && !ctx.websocket_skip_inspection {
1705 let inspector = crate::websocket::WebSocketInspector::with_metrics(
1707 self.agent_manager.clone(),
1708 ctx.route_id
1709 .clone()
1710 .unwrap_or_else(|| "unknown".to_string()),
1711 ctx.trace_id.clone(),
1712 ctx.client_ip.clone(),
1713 100, Some(self.metrics.clone()),
1715 );
1716
1717 let handler = crate::websocket::WebSocketHandler::new(
1718 std::sync::Arc::new(inspector),
1719 1024 * 1024, );
1721
1722 ctx.websocket_handler = Some(std::sync::Arc::new(handler));
1723
1724 info!(
1725 correlation_id = %ctx.trace_id,
1726 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1727 agent_count = ctx.websocket_inspection_agents.len(),
1728 "WebSocket upgrade successful, frame inspection enabled"
1729 );
1730 } else if ctx.websocket_skip_inspection {
1731 debug!(
1732 correlation_id = %ctx.trace_id,
1733 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1734 "WebSocket upgrade successful, inspection skipped (compression negotiated)"
1735 );
1736 } else {
1737 debug!(
1738 correlation_id = %ctx.trace_id,
1739 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
1740 "WebSocket upgrade successful"
1741 );
1742 }
1743 }
1744
1745 upstream_response.insert_header("X-Correlation-Id", &ctx.trace_id)?;
1747
1748 if let Some(ref rate_info) = ctx.rate_limit_info {
1750 upstream_response.insert_header("X-RateLimit-Limit", rate_info.limit.to_string())?;
1751 upstream_response
1752 .insert_header("X-RateLimit-Remaining", rate_info.remaining.to_string())?;
1753 upstream_response.insert_header("X-RateLimit-Reset", rate_info.reset_at.to_string())?;
1754 }
1755
1756 if ctx.inference_budget_enabled {
1758 if let Some(remaining) = ctx.inference_budget_remaining {
1759 upstream_response.insert_header("X-Budget-Remaining", remaining.to_string())?;
1760 }
1761 if let Some(period_reset) = ctx.inference_budget_period_reset {
1762 let reset_datetime = chrono::DateTime::from_timestamp(period_reset as i64, 0)
1764 .map(|dt| dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1765 .unwrap_or_else(|| period_reset.to_string());
1766 upstream_response.insert_header("X-Budget-Period-Reset", reset_datetime)?;
1767 }
1768 }
1769
1770 if let Some(ref country_code) = ctx.geo_country_code {
1772 upstream_response.insert_header("X-GeoIP-Country", country_code)?;
1773 }
1774
1775 if let Some(ref route_config) = ctx.route_config {
1777 let mods = &route_config.policies.response_headers;
1778 for (old_name, new_name) in &mods.rename {
1780 if let Some(value) = upstream_response
1781 .headers
1782 .get(old_name)
1783 .and_then(|v| v.to_str().ok())
1784 {
1785 let owned = value.to_string();
1786 upstream_response
1787 .insert_header(new_name.clone(), &owned)
1788 .ok();
1789 upstream_response.remove_header(old_name);
1790 }
1791 }
1792 for (name, value) in &mods.set {
1793 upstream_response
1794 .insert_header(name.clone(), value.as_str())
1795 .ok();
1796 }
1797 for (name, value) in &mods.add {
1798 upstream_response
1799 .append_header(name.clone(), value.as_str())
1800 .ok();
1801 }
1802 for name in &mods.remove {
1803 upstream_response.remove_header(name);
1804 }
1805 }
1806
1807 if let Some(ref cache_status) = ctx.cache_status {
1809 let status_header_enabled = ctx
1810 .config
1811 .as_ref()
1812 .and_then(|c| c.cache.as_ref())
1813 .map(|c| c.status_header)
1814 .unwrap_or(false);
1815
1816 if status_header_enabled {
1817 let value = match cache_status {
1818 super::context::CacheStatus::HitMemory => "grapsus; hit; detail=memory",
1819 super::context::CacheStatus::HitDisk => "grapsus; hit; detail=disk",
1820 super::context::CacheStatus::Hit => "grapsus; hit",
1821 super::context::CacheStatus::HitStale => "grapsus; fwd=stale",
1822 super::context::CacheStatus::Miss => "grapsus; fwd=miss",
1823 super::context::CacheStatus::Bypass(reason) => {
1824 match *reason {
1827 "method" => "grapsus; fwd=bypass; detail=method",
1828 "disabled" => "grapsus; fwd=bypass; detail=disabled",
1829 "no-route" => "grapsus; fwd=bypass; detail=no-route",
1830 _ => "grapsus; fwd=bypass",
1831 }
1832 }
1833 };
1834 upstream_response.insert_header("Cache-Status", value).ok();
1835 }
1836 }
1837
1838 if let Some(config) = ctx.config.as_ref().map(std::sync::Arc::clone) {
1840 super::filters::apply_response_filters(upstream_response, ctx, &config);
1841 }
1842
1843 if ctx.compress_enabled {
1845 session.upstream_compression.adjust_level(6);
1846 }
1847
1848 if let Some(keepalive_secs) = ctx.listener_keepalive_timeout_secs {
1850 session
1851 .downstream_session
1852 .set_keepalive(Some(keepalive_secs));
1853 }
1854
1855 if ctx.sticky_session_new_assignment {
1857 if let Some(ref set_cookie_header) = ctx.sticky_session_set_cookie {
1858 upstream_response.insert_header("Set-Cookie", set_cookie_header)?;
1859 trace!(
1860 correlation_id = %ctx.trace_id,
1861 sticky_target_index = ?ctx.sticky_target_index,
1862 "Added sticky session Set-Cookie header"
1863 );
1864 }
1865 }
1866
1867 if ctx.guardrail_warning {
1869 upstream_response.insert_header("X-Guardrail-Warning", "prompt-injection-detected")?;
1870 }
1871
1872 if ctx.used_fallback() {
1874 upstream_response.insert_header("X-Fallback-Used", "true")?;
1875
1876 if let Some(ref upstream) = ctx.upstream {
1877 upstream_response.insert_header("X-Fallback-Upstream", upstream)?;
1878 }
1879
1880 if let Some(ref reason) = ctx.fallback_reason {
1881 upstream_response.insert_header("X-Fallback-Reason", reason.to_string())?;
1882 }
1883
1884 if let Some(ref original) = ctx.original_upstream {
1885 upstream_response.insert_header("X-Original-Upstream", original)?;
1886 }
1887
1888 if let Some(ref mapping) = ctx.model_mapping_applied {
1889 upstream_response
1890 .insert_header("X-Model-Mapping", format!("{} -> {}", mapping.0, mapping.1))?;
1891 }
1892
1893 trace!(
1894 correlation_id = %ctx.trace_id,
1895 fallback_attempt = ctx.fallback_attempt,
1896 fallback_upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
1897 original_upstream = ctx.original_upstream.as_deref().unwrap_or("unknown"),
1898 "Added fallback response headers"
1899 );
1900
1901 if status < 400 {
1903 if let Some(metrics) = get_fallback_metrics() {
1904 metrics.record_fallback_success(
1905 ctx.route_id.as_deref().unwrap_or("unknown"),
1906 ctx.upstream.as_deref().unwrap_or("unknown"),
1907 );
1908 }
1909 }
1910 }
1911
1912 if ctx.inference_rate_limit_enabled {
1914 let content_type = upstream_response
1916 .headers
1917 .get("content-type")
1918 .and_then(|ct| ct.to_str().ok());
1919
1920 if is_sse_response(content_type) {
1921 let provider = ctx
1923 .route_config
1924 .as_ref()
1925 .and_then(|r| r.inference.as_ref())
1926 .map(|i| i.provider)
1927 .unwrap_or_default();
1928
1929 ctx.inference_streaming_response = true;
1930 ctx.inference_streaming_counter = Some(StreamingTokenCounter::new(
1931 provider,
1932 ctx.inference_model.clone(),
1933 ));
1934
1935 trace!(
1936 correlation_id = %ctx.trace_id,
1937 content_type = ?content_type,
1938 model = ?ctx.inference_model,
1939 "Initialized streaming token counter for SSE response"
1940 );
1941 }
1942 }
1943
1944 if !ctx.route_agent_ids.is_empty() {
1946 let agent_ids = ctx.route_agent_ids.clone();
1947 let mut resp_headers_map: std::collections::HashMap<String, Vec<String>> =
1948 std::collections::HashMap::with_capacity(upstream_response.headers.len());
1949 for (name, value) in upstream_response.headers.iter() {
1950 resp_headers_map
1951 .entry(name.as_str().to_string())
1952 .or_default()
1953 .push(value.to_str().unwrap_or("").to_string());
1954 }
1955
1956 let agent_ctx = crate::agents::AgentCallContext {
1957 correlation_id: grapsus_common::CorrelationId::from_string(&ctx.trace_id),
1958 metadata: grapsus_agent_protocol::RequestMetadata {
1959 correlation_id: ctx.trace_id.clone(),
1960 request_id: uuid::Uuid::new_v4().to_string(),
1961 client_ip: ctx.client_ip.clone(),
1962 client_port: 0,
1963 server_name: ctx.host.clone(),
1964 protocol: "HTTP/1.1".to_string(),
1965 tls_version: None,
1966 tls_cipher: None,
1967 route_id: ctx.route_id.clone(),
1968 upstream_id: ctx.upstream.clone(),
1969 timestamp: chrono::Utc::now().to_rfc3339(),
1970 traceparent: ctx.traceparent(),
1971 },
1972 route_id: ctx.route_id.clone(),
1973 upstream_id: ctx.upstream.clone(),
1974 request_body: None,
1975 response_body: None,
1976 };
1977
1978 match self
1979 .agent_manager
1980 .process_response_headers(&agent_ctx, status, &resp_headers_map, &agent_ids)
1981 .await
1982 {
1983 Ok(decision) => {
1984 for op in &decision.response_headers {
1986 match op {
1987 grapsus_agent_protocol::HeaderOp::Set { name, value } => {
1988 upstream_response
1989 .insert_header(name.clone(), value.as_str())
1990 .ok();
1991 }
1992 grapsus_agent_protocol::HeaderOp::Add { name, value } => {
1993 upstream_response
1994 .append_header(name.clone(), value.as_str())
1995 .ok();
1996 }
1997 grapsus_agent_protocol::HeaderOp::Remove { name } => {
1998 upstream_response.remove_header(name);
1999 }
2000 }
2001 }
2002
2003 let has_body_agents = self
2005 .agent_manager
2006 .any_agent_handles_event(
2007 &agent_ids,
2008 grapsus_agent_protocol::EventType::ResponseBodyChunk,
2009 )
2010 .await;
2011 if has_body_agents {
2012 ctx.response_agent_processing_enabled = true;
2013 upstream_response.insert_header("Connection", "close").ok();
2016 session.downstream_session.set_keepalive(None);
2017 debug!(
2018 correlation_id = %ctx.trace_id,
2019 "Enabling response body agent processing (agent subscribes to ResponseBody)"
2020 );
2021 }
2022
2023 debug!(
2024 correlation_id = %ctx.trace_id,
2025 response_headers_modified = !decision.response_headers.is_empty(),
2026 needs_body = ctx.response_agent_processing_enabled,
2027 "Response headers processed through agents"
2028 );
2029 }
2030 Err(e) => {
2031 warn!(
2032 correlation_id = %ctx.trace_id,
2033 error = %e,
2034 "Agent response header processing failed, continuing without agent"
2035 );
2036 }
2037 }
2038 }
2039
2040 if status >= 400 {
2042 trace!(
2043 correlation_id = %ctx.trace_id,
2044 status = status,
2045 "Handling error response"
2046 );
2047 self.handle_error_response(upstream_response, ctx).await?;
2048 }
2049
2050 self.metrics.record_request(
2052 ctx.route_id.as_deref().unwrap_or("unknown"),
2053 &ctx.method,
2054 status,
2055 duration,
2056 );
2057
2058 if let Some(ref mut span) = ctx.otel_span {
2060 span.set_status(status);
2061 if let Some(ref upstream) = ctx.upstream {
2062 span.set_upstream(upstream, "");
2063 }
2064 if status >= 500 {
2065 span.record_error(&format!("HTTP {}", status));
2066 }
2067 }
2068
2069 if let Some(ref upstream) = ctx.upstream {
2071 let success = status < 500;
2072
2073 trace!(
2074 correlation_id = %ctx.trace_id,
2075 upstream = %upstream,
2076 success = success,
2077 status = status,
2078 "Recording passive health check result"
2079 );
2080
2081 let error_msg = if !success {
2082 Some(format!("HTTP {}", status))
2083 } else {
2084 None
2085 };
2086 self.passive_health
2087 .record_outcome(upstream, success, error_msg.as_deref())
2088 .await;
2089
2090 if let Some(pool) = self.upstream_pools.get(upstream).await {
2092 pool.report_result(upstream, success).await;
2093 }
2094
2095 if !success {
2096 warn!(
2097 correlation_id = %ctx.trace_id,
2098 upstream = %upstream,
2099 status = status,
2100 "Upstream returned error status"
2101 );
2102 }
2103 }
2104
2105 if status >= 500 {
2107 error!(
2108 correlation_id = %ctx.trace_id,
2109 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2110 upstream = ctx.upstream.as_deref().unwrap_or("none"),
2111 method = %ctx.method,
2112 path = %ctx.path,
2113 status = status,
2114 duration_ms = duration.as_millis(),
2115 attempts = ctx.upstream_attempts,
2116 "Request completed with server error"
2117 );
2118 self.log_manager.log_request_error(
2119 "error",
2120 "Request completed with server error",
2121 &ctx.trace_id,
2122 ctx.route_id.as_deref(),
2123 ctx.upstream.as_deref(),
2124 Some(format!(
2125 "status={} method={} path={} duration_ms={}",
2126 status,
2127 ctx.method,
2128 ctx.path,
2129 duration.as_millis()
2130 )),
2131 );
2132 } else if status >= 400 {
2133 warn!(
2134 correlation_id = %ctx.trace_id,
2135 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2136 upstream = ctx.upstream.as_deref().unwrap_or("none"),
2137 method = %ctx.method,
2138 path = %ctx.path,
2139 status = status,
2140 duration_ms = duration.as_millis(),
2141 "Request completed with client error"
2142 );
2143 self.log_manager.log_request_error(
2144 "warn",
2145 "Request completed with client error",
2146 &ctx.trace_id,
2147 ctx.route_id.as_deref(),
2148 ctx.upstream.as_deref(),
2149 Some(format!(
2150 "status={} method={} path={}",
2151 status, ctx.method, ctx.path
2152 )),
2153 );
2154 } else {
2155 debug!(
2156 correlation_id = %ctx.trace_id,
2157 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2158 upstream = ctx.upstream.as_deref().unwrap_or("none"),
2159 method = %ctx.method,
2160 path = %ctx.path,
2161 status = status,
2162 duration_ms = duration.as_millis(),
2163 attempts = ctx.upstream_attempts,
2164 "Request completed"
2165 );
2166 }
2167
2168 Ok(())
2169 }
2170
2171 async fn upstream_request_filter(
2174 &self,
2175 _session: &mut Session,
2176 upstream_request: &mut pingora::http::RequestHeader,
2177 ctx: &mut Self::CTX,
2178 ) -> Result<()>
2179 where
2180 Self::CTX: Send + Sync,
2181 {
2182 trace!(
2183 correlation_id = %ctx.trace_id,
2184 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2185 "Applying upstream request modifications"
2186 );
2187
2188 upstream_request
2190 .insert_header("X-Trace-Id", &ctx.trace_id)
2191 .ok();
2192
2193 if let Some(ref span) = ctx.otel_span {
2195 let sampled = ctx
2196 .trace_context
2197 .as_ref()
2198 .map(|c| c.sampled)
2199 .unwrap_or(true);
2200 let traceparent =
2201 crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled);
2202 upstream_request
2203 .insert_header(crate::otel::TRACEPARENT_HEADER, &traceparent)
2204 .ok();
2205 }
2206
2207 upstream_request
2209 .insert_header("X-Forwarded-By", "Grapsus")
2210 .ok();
2211
2212 if let Some(ref route_config) = ctx.route_config {
2216 let mods = &route_config.policies.request_headers;
2217
2218 for (old_name, new_name) in &mods.rename {
2220 if let Some(value) = upstream_request
2221 .headers
2222 .get(old_name)
2223 .and_then(|v| v.to_str().ok())
2224 {
2225 let owned = value.to_string();
2226 upstream_request
2227 .insert_header(new_name.clone(), &owned)
2228 .ok();
2229 upstream_request.remove_header(old_name);
2230 }
2231 }
2232
2233 for (name, value) in &mods.set {
2235 upstream_request
2236 .insert_header(name.clone(), value.as_str())
2237 .ok();
2238 }
2239
2240 for (name, value) in &mods.add {
2242 upstream_request
2243 .append_header(name.clone(), value.as_str())
2244 .ok();
2245 }
2246
2247 for name in &mods.remove {
2249 upstream_request.remove_header(name);
2250 }
2251
2252 trace!(
2253 correlation_id = %ctx.trace_id,
2254 "Applied request header modifications"
2255 );
2256 }
2257
2258 if let Some(ref config) = ctx.config {
2260 super::filters::apply_request_headers_filters(upstream_request, ctx, config);
2261 }
2262
2263 upstream_request.remove_header("X-Internal-Token");
2265 upstream_request.remove_header("Authorization-Internal");
2266
2267 if let Some(ref route_config) = ctx.route_config {
2270 if let Some(ref shadow_config) = route_config.shadow {
2271 let pools_snapshot = self.upstream_pools.snapshot().await;
2273 let upstream_pools = std::sync::Arc::new(pools_snapshot);
2274
2275 let route_id = ctx
2277 .route_id
2278 .clone()
2279 .unwrap_or_else(|| "unknown".to_string());
2280
2281 let shadow_manager = crate::shadow::ShadowManager::new(
2283 upstream_pools,
2284 shadow_config.clone(),
2285 Some(std::sync::Arc::clone(&self.metrics)),
2286 route_id,
2287 );
2288
2289 if shadow_manager.should_shadow(upstream_request) {
2291 trace!(
2292 correlation_id = %ctx.trace_id,
2293 shadow_upstream = %shadow_config.upstream,
2294 percentage = shadow_config.percentage,
2295 "Shadowing request"
2296 );
2297
2298 let shadow_headers = upstream_request.clone();
2300
2301 let shadow_ctx = crate::upstream::RequestContext {
2303 client_ip: ctx.client_ip.parse().ok(),
2304 headers: std::collections::HashMap::new(), path: ctx.path.clone(),
2306 method: ctx.method.clone(),
2307 };
2308
2309 let buffer_body = shadow_config.buffer_body
2311 && crate::shadow::should_buffer_method(&ctx.method);
2312
2313 if buffer_body {
2314 trace!(
2318 correlation_id = %ctx.trace_id,
2319 "Deferring shadow request until body is buffered"
2320 );
2321 ctx.shadow_pending = Some(crate::proxy::context::ShadowPendingRequest {
2322 headers: shadow_headers,
2323 manager: std::sync::Arc::new(shadow_manager),
2324 request_ctx: shadow_ctx,
2325 include_body: true,
2326 });
2327 if !ctx.body_inspection_enabled {
2330 ctx.body_inspection_enabled = true;
2331 }
2334 } else {
2335 shadow_manager.shadow_request(shadow_headers, None, shadow_ctx);
2337 ctx.shadow_sent = true;
2338 }
2339 }
2340 }
2341 }
2342
2343 Ok(())
2344 }
2345
2346 fn response_body_filter(
2352 &self,
2353 _session: &mut Session,
2354 body: &mut Option<Bytes>,
2355 end_of_stream: bool,
2356 ctx: &mut Self::CTX,
2357 ) -> Result<Option<Duration>, Box<Error>> {
2358 if ctx.is_websocket_upgrade {
2361 if let Some(ref handler) = ctx.websocket_handler {
2362 let handler = handler.clone();
2363 let data = body.take();
2364
2365 let result = tokio::task::block_in_place(|| {
2368 tokio::runtime::Handle::current()
2369 .block_on(async { handler.process_server_data(data).await })
2370 });
2371
2372 match result {
2373 crate::websocket::ProcessResult::Forward(data) => {
2374 *body = data;
2375 }
2376 crate::websocket::ProcessResult::Close(reason) => {
2377 warn!(
2378 correlation_id = %ctx.trace_id,
2379 code = reason.code,
2380 reason = %reason.reason,
2381 "WebSocket connection closed by agent (server->client)"
2382 );
2383 let close_frame =
2386 crate::websocket::WebSocketFrame::close(reason.code, &reason.reason);
2387 let codec = crate::websocket::WebSocketCodec::new(1024 * 1024);
2388 if let Ok(encoded) = codec.encode_frame(&close_frame, false) {
2389 *body = Some(Bytes::from(encoded));
2390 }
2391 }
2392 }
2393 }
2394 return Ok(None);
2396 }
2397
2398 if ctx.response_agent_processing_enabled && !ctx.route_agent_ids.is_empty() {
2400 if let Some(ref chunk) = body {
2401 ctx.response_agent_body_buffer.extend_from_slice(chunk);
2402 }
2403
2404 if end_of_stream {
2405 let agent_ids = ctx.route_agent_ids.clone();
2406 let buffer = std::mem::take(&mut ctx.response_agent_body_buffer);
2407 let chunk_index = 0u32;
2408 let total_size = Some(buffer.len());
2409 let trace_id = ctx.trace_id.clone();
2410 let client_ip = ctx.client_ip.clone();
2411 let host = ctx.host.clone();
2412 let route_id = ctx.route_id.clone();
2413 let upstream_id = ctx.upstream.clone();
2414 let traceparent = ctx.traceparent();
2415 let agent_mgr = self.agent_manager.clone();
2416
2417 let result = tokio::task::block_in_place(|| {
2420 tokio::runtime::Handle::current().block_on(async {
2421 let agent_ctx = crate::agents::AgentCallContext {
2422 correlation_id: grapsus_common::CorrelationId::from_string(&trace_id),
2423 metadata: grapsus_agent_protocol::RequestMetadata {
2424 correlation_id: trace_id.clone(),
2425 request_id: uuid::Uuid::new_v4().to_string(),
2426 client_ip,
2427 client_port: 0,
2428 server_name: host,
2429 protocol: "HTTP/1.1".to_string(),
2430 tls_version: None,
2431 tls_cipher: None,
2432 route_id: route_id.clone(),
2433 upstream_id: upstream_id.clone(),
2434 timestamp: chrono::Utc::now().to_rfc3339(),
2435 traceparent,
2436 },
2437 route_id,
2438 upstream_id,
2439 request_body: None,
2440 response_body: None,
2441 };
2442
2443 agent_mgr
2444 .process_response_body_streaming(
2445 &agent_ctx,
2446 &buffer,
2447 true, chunk_index,
2449 buffer.len(),
2450 total_size,
2451 &agent_ids,
2452 )
2453 .await
2454 })
2455 });
2456
2457 match result {
2458 Ok(decision) => {
2459 if let Some(mutation) = decision.response_body_mutation {
2461 if let Some(ref data) = mutation.data {
2462 if !data.is_empty() {
2463 if let Ok(decoded) = base64::Engine::decode(
2465 &base64::engine::general_purpose::STANDARD,
2466 data,
2467 ) {
2468 debug!(
2469 correlation_id = %ctx.trace_id,
2470 original_size = buffer.len(),
2471 new_size = decoded.len(),
2472 "Agent replaced response body"
2473 );
2474 *body = Some(Bytes::from(decoded));
2475 ctx.response_agent_body_complete = true;
2476 } else {
2477 warn!(
2478 correlation_id = %ctx.trace_id,
2479 "Failed to decode agent response body mutation (invalid base64)"
2480 );
2481 }
2482 }
2483 }
2485 }
2487
2488 }
2492 Err(e) => {
2493 warn!(
2494 correlation_id = %ctx.trace_id,
2495 error = %e,
2496 "Agent response body processing failed, passing through original"
2497 );
2498 }
2499 }
2500 } else if !end_of_stream {
2501 *body = None;
2503 return Ok(None);
2504 }
2505 }
2506
2507 if let Some(ref chunk) = body {
2509 ctx.response_bytes += chunk.len() as u64;
2510
2511 trace!(
2512 correlation_id = %ctx.trace_id,
2513 chunk_size = chunk.len(),
2514 total_response_bytes = ctx.response_bytes,
2515 end_of_stream = end_of_stream,
2516 "Processing response body chunk"
2517 );
2518
2519 if let Some(ref mut counter) = ctx.inference_streaming_counter {
2521 let result = counter.process_chunk(chunk);
2522
2523 if result.content.is_some() || result.is_done {
2524 trace!(
2525 correlation_id = %ctx.trace_id,
2526 has_content = result.content.is_some(),
2527 is_done = result.is_done,
2528 chunks_processed = counter.chunks_processed(),
2529 accumulated_content_len = counter.content().len(),
2530 "Processed SSE chunk for token counting"
2531 );
2532 }
2533 }
2534
2535 if ctx.response_body_inspection_enabled
2539 && !ctx.response_body_inspection_agents.is_empty()
2540 {
2541 let config = ctx
2542 .config
2543 .get_or_insert_with(|| self.config_manager.current());
2544 let max_inspection_bytes = config
2545 .waf
2546 .as_ref()
2547 .map(|w| w.body_inspection.max_inspection_bytes as u64)
2548 .unwrap_or(1024 * 1024);
2549
2550 if ctx.response_body_bytes_inspected < max_inspection_bytes {
2551 let bytes_to_inspect = std::cmp::min(
2552 chunk.len() as u64,
2553 max_inspection_bytes - ctx.response_body_bytes_inspected,
2554 ) as usize;
2555
2556 ctx.response_body_bytes_inspected += bytes_to_inspect as u64;
2560 ctx.response_body_chunk_index += 1;
2561
2562 trace!(
2563 correlation_id = %ctx.trace_id,
2564 bytes_inspected = ctx.response_body_bytes_inspected,
2565 max_inspection_bytes = max_inspection_bytes,
2566 chunk_index = ctx.response_body_chunk_index,
2567 "Tracking response body for inspection"
2568 );
2569 }
2570 }
2571 }
2572
2573 if end_of_stream {
2574 trace!(
2575 correlation_id = %ctx.trace_id,
2576 total_response_bytes = ctx.response_bytes,
2577 response_bytes_inspected = ctx.response_body_bytes_inspected,
2578 "Response body complete"
2579 );
2580 }
2581
2582 Ok(None)
2584 }
2585
2586 async fn connected_to_upstream(
2589 &self,
2590 _session: &mut Session,
2591 reused: bool,
2592 peer: &HttpPeer,
2593 #[cfg(unix)] _fd: RawFd,
2594 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
2595 digest: Option<&Digest>,
2596 ctx: &mut Self::CTX,
2597 ) -> Result<(), Box<Error>> {
2598 ctx.connection_reused = reused;
2600
2601 if reused {
2603 trace!(
2604 correlation_id = %ctx.trace_id,
2605 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2606 peer_address = %peer.address(),
2607 "Reusing existing upstream connection"
2608 );
2609 } else {
2610 debug!(
2611 correlation_id = %ctx.trace_id,
2612 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
2613 peer_address = %peer.address(),
2614 ssl = digest.as_ref().map(|d| d.ssl_digest.is_some()).unwrap_or(false),
2615 "Established new upstream connection"
2616 );
2617 }
2618
2619 Ok(())
2620 }
2621
2622 fn request_cache_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<()> {
2632 let route_id = match ctx.route_id.as_deref() {
2634 Some(id) => id,
2635 None => {
2636 trace!(
2637 correlation_id = %ctx.trace_id,
2638 "Cache filter: no route ID, skipping cache"
2639 );
2640 return Ok(());
2641 }
2642 };
2643
2644 if !self.cache_manager.is_enabled(route_id) {
2646 ctx.cache_status = Some(super::context::CacheStatus::Bypass("disabled"));
2647 trace!(
2648 correlation_id = %ctx.trace_id,
2649 route_id = %route_id,
2650 "Cache disabled for route"
2651 );
2652 return Ok(());
2653 }
2654
2655 if !self
2657 .cache_manager
2658 .is_method_cacheable(route_id, &ctx.method)
2659 {
2660 ctx.cache_status = Some(super::context::CacheStatus::Bypass("method"));
2661 trace!(
2662 correlation_id = %ctx.trace_id,
2663 route_id = %route_id,
2664 method = %ctx.method,
2665 "Method not cacheable"
2666 );
2667 return Ok(());
2668 }
2669
2670 if !self.cache_manager.is_path_cacheable(route_id, &ctx.path) {
2672 ctx.cache_status = Some(super::context::CacheStatus::Bypass("excluded"));
2673 trace!(
2674 correlation_id = %ctx.trace_id,
2675 route_id = %route_id,
2676 path = %ctx.path,
2677 "Path excluded from caching"
2678 );
2679 return Ok(());
2680 }
2681
2682 debug!(
2684 correlation_id = %ctx.trace_id,
2685 route_id = %route_id,
2686 method = %ctx.method,
2687 path = %ctx.path,
2688 "Enabling HTTP caching for request"
2689 );
2690
2691 let storage = get_cache_storage();
2693 let eviction = get_cache_eviction();
2694 let cache_lock = get_cache_lock();
2695
2696 session.cache.enable(
2698 storage,
2699 Some(eviction),
2700 None, Some(cache_lock),
2702 None, );
2704
2705 ctx.cache_eligible = true;
2707
2708 trace!(
2709 correlation_id = %ctx.trace_id,
2710 route_id = %route_id,
2711 cache_enabled = session.cache.enabled(),
2712 "Cache enabled for request"
2713 );
2714
2715 Ok(())
2716 }
2717
2718 fn cache_key_callback(&self, session: &Session, ctx: &mut Self::CTX) -> Result<CacheKey> {
2723 let req_header = session.req_header();
2724 let method = req_header.method.as_str();
2725 let path = req_header.uri.path();
2726 let host = ctx.host.as_deref().unwrap_or("unknown");
2727 let query = req_header.uri.query();
2728
2729 let key_string = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2731
2732 trace!(
2733 correlation_id = %ctx.trace_id,
2734 cache_key = %key_string,
2735 "Generated cache key"
2736 );
2737
2738 let host = req_header.uri.authority().map(|a| a.as_str()).unwrap_or("default"); let path_and_query = req_header.uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("/"); Ok(CacheKey::new(host, path_and_query, ""))
2741 }
2742
2743 fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
2748 session.cache.cache_miss();
2750
2751 ctx.cache_status = Some(super::context::CacheStatus::Miss);
2752
2753 if let Some(route_id) = ctx.route_id.as_deref() {
2755 self.cache_manager.stats().record_miss();
2756
2757 trace!(
2758 correlation_id = %ctx.trace_id,
2759 route_id = %route_id,
2760 path = %ctx.path,
2761 "Cache miss"
2762 );
2763 }
2764 }
2765
2766 async fn cache_hit_filter(
2772 &self,
2773 session: &mut Session,
2774 meta: &CacheMeta,
2775 hit_handler: &mut HitHandler,
2776 is_fresh: bool,
2777 ctx: &mut Self::CTX,
2778 ) -> Result<Option<ForcedFreshness>>
2779 where
2780 Self::CTX: Send + Sync,
2781 {
2782 let req_header = session.req_header();
2784 let method = req_header.method.as_str();
2785 let path = req_header.uri.path();
2786 let host = req_header.uri.host().unwrap_or("localhost");
2787 let query = req_header.uri.query();
2788
2789 let cache_key = crate::cache::CacheManager::generate_cache_key(method, host, path, query);
2791
2792 if self.cache_manager.should_invalidate(&cache_key) {
2794 info!(
2795 correlation_id = %ctx.trace_id,
2796 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2797 cache_key = %cache_key,
2798 "Cache entry invalidated by purge request"
2799 );
2800 return Ok(Some(ForcedFreshness::ForceExpired));
2802 }
2803
2804 if is_fresh {
2806 let is_disk_hit = hit_handler
2808 .as_any()
2809 .downcast_ref::<HybridHitHandler>()
2810 .is_some()
2811 || hit_handler
2812 .as_any()
2813 .downcast_ref::<DiskHitHandler>()
2814 .is_some();
2815
2816 let stats = self.cache_manager.stats();
2817 if is_disk_hit {
2818 ctx.cache_status = Some(super::context::CacheStatus::HitDisk);
2819 stats.record_disk_hit();
2820 } else {
2821 ctx.cache_status = Some(super::context::CacheStatus::HitMemory);
2822 stats.record_memory_hit();
2823 }
2824 stats.record_hit();
2825
2826 debug!(
2827 correlation_id = %ctx.trace_id,
2828 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2829 is_fresh = is_fresh,
2830 tier = if is_disk_hit { "disk" } else { "memory" },
2831 "Cache hit (fresh)"
2832 );
2833 } else {
2834 ctx.cache_status = Some(super::context::CacheStatus::HitStale);
2835
2836 trace!(
2837 correlation_id = %ctx.trace_id,
2838 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
2839 is_fresh = is_fresh,
2840 "Cache hit (stale)"
2841 );
2842 }
2843
2844 Ok(None)
2846 }
2847
2848 fn response_cache_filter(
2853 &self,
2854 _session: &Session,
2855 resp: &ResponseHeader,
2856 ctx: &mut Self::CTX,
2857 ) -> Result<RespCacheable> {
2858 let route_id = match ctx.route_id.as_deref() {
2859 Some(id) => id,
2860 None => {
2861 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2862 "no_route",
2863 )));
2864 }
2865 };
2866
2867 if !self.cache_manager.is_enabled(route_id) {
2869 return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
2870 "disabled",
2871 )));
2872 }
2873
2874 let status = resp.status.as_u16();
2875
2876 if !self.cache_manager.is_status_cacheable(route_id, status) {
2878 trace!(
2879 correlation_id = %ctx.trace_id,
2880 route_id = %route_id,
2881 status = status,
2882 "Status code not cacheable"
2883 );
2884 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2885 }
2886
2887 if let Some(cache_control) = resp.headers.get("cache-control") {
2889 if let Ok(cc_str) = cache_control.to_str() {
2890 if crate::cache::CacheManager::is_no_cache(cc_str) {
2891 trace!(
2892 correlation_id = %ctx.trace_id,
2893 route_id = %route_id,
2894 cache_control = %cc_str,
2895 "Response has no-cache directive"
2896 );
2897 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2898 }
2899 }
2900 }
2901
2902 let cache_control = resp
2904 .headers
2905 .get("cache-control")
2906 .and_then(|v| v.to_str().ok());
2907 let ttl = self.cache_manager.calculate_ttl(route_id, cache_control);
2908
2909 if ttl.is_zero() {
2910 trace!(
2911 correlation_id = %ctx.trace_id,
2912 route_id = %route_id,
2913 "TTL is zero, not caching"
2914 );
2915 return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
2916 }
2917
2918 let config = self
2920 .cache_manager
2921 .get_route_config(route_id)
2922 .unwrap_or_default();
2923
2924 let now = std::time::SystemTime::now();
2926 let fresh_until = now + ttl;
2927
2928 let header = resp.clone();
2930
2931 let cache_meta = CacheMeta::new(
2933 fresh_until,
2934 now,
2935 config.stale_while_revalidate_secs as u32,
2936 config.stale_if_error_secs as u32,
2937 header,
2938 );
2939
2940 self.cache_manager.stats().record_store();
2942
2943 debug!(
2944 correlation_id = %ctx.trace_id,
2945 route_id = %route_id,
2946 status = status,
2947 ttl_secs = ttl.as_secs(),
2948 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2949 stale_if_error_secs = config.stale_if_error_secs,
2950 "Caching response"
2951 );
2952
2953 Ok(RespCacheable::Cacheable(cache_meta))
2954 }
2955
2956 fn should_serve_stale(
2960 &self,
2961 _session: &mut Session,
2962 ctx: &mut Self::CTX,
2963 error: Option<&Error>,
2964 ) -> bool {
2965 let route_id = match ctx.route_id.as_deref() {
2966 Some(id) => id,
2967 None => return false,
2968 };
2969
2970 let config = match self.cache_manager.get_route_config(route_id) {
2972 Some(c) => c,
2973 None => return false,
2974 };
2975
2976 if let Some(e) = error {
2978 if e.esource() == &pingora::ErrorSource::Upstream {
2980 debug!(
2981 correlation_id = %ctx.trace_id,
2982 route_id = %route_id,
2983 error = %e,
2984 stale_if_error_secs = config.stale_if_error_secs,
2985 "Considering stale-if-error"
2986 );
2987 return config.stale_if_error_secs > 0;
2988 }
2989 }
2990
2991 if error.is_none() && config.stale_while_revalidate_secs > 0 {
2993 trace!(
2994 correlation_id = %ctx.trace_id,
2995 route_id = %route_id,
2996 stale_while_revalidate_secs = config.stale_while_revalidate_secs,
2997 "Allowing stale-while-revalidate"
2998 );
2999 return true;
3000 }
3001
3002 false
3003 }
3004
3005 fn range_header_filter(
3015 &self,
3016 session: &mut Session,
3017 response: &mut ResponseHeader,
3018 ctx: &mut Self::CTX,
3019 ) -> pingora_proxy::RangeType
3020 where
3021 Self::CTX: Send + Sync,
3022 {
3023 let supports_range = ctx.route_config.as_ref().is_none_or(|config| {
3025 matches!(
3027 config.service_type,
3028 grapsus_config::ServiceType::Static | grapsus_config::ServiceType::Web
3029 )
3030 });
3031
3032 if !supports_range {
3033 trace!(
3034 correlation_id = %ctx.trace_id,
3035 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3036 "Range request not supported for this route type"
3037 );
3038 return pingora_proxy::RangeType::None;
3039 }
3040
3041 let range_type = pingora_proxy::range_header_filter(session.req_header(), response, None);
3043
3044 match &range_type {
3045 pingora_proxy::RangeType::None => {
3046 trace!(
3047 correlation_id = %ctx.trace_id,
3048 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3049 "No range request or not applicable"
3050 );
3051 }
3052 pingora_proxy::RangeType::Single(range) => {
3053 trace!(
3054 correlation_id = %ctx.trace_id,
3055 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3056 range_start = range.start,
3057 range_end = range.end,
3058 "Processing single-range request"
3059 );
3060 }
3061 pingora_proxy::RangeType::Multi(multi) => {
3062 trace!(
3063 correlation_id = %ctx.trace_id,
3064 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3065 range_count = multi.ranges.len(),
3066 "Processing multi-range request"
3067 );
3068 }
3069 pingora_proxy::RangeType::Invalid => {
3070 debug!(
3071 correlation_id = %ctx.trace_id,
3072 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3073 "Invalid range header"
3074 );
3075 }
3076 }
3077
3078 range_type
3079 }
3080
3081 async fn fail_to_proxy(
3084 &self,
3085 session: &mut Session,
3086 e: &Error,
3087 ctx: &mut Self::CTX,
3088 ) -> pingora_proxy::FailToProxy
3089 where
3090 Self::CTX: Send + Sync,
3091 {
3092 let error_code = match e.etype() {
3093 ErrorType::ConnectRefused => 503,
3095 ErrorType::ConnectTimedout => 504,
3096 ErrorType::ConnectNoRoute => 502,
3097
3098 ErrorType::ReadTimedout => 504,
3100 ErrorType::WriteTimedout => 504,
3101
3102 ErrorType::TLSHandshakeFailure => 502,
3104 ErrorType::InvalidCert => 502,
3105
3106 ErrorType::InvalidHTTPHeader => 400,
3108 ErrorType::H2Error => 502,
3109
3110 ErrorType::ConnectProxyFailure => 502,
3112 ErrorType::ConnectionClosed => 502,
3113
3114 ErrorType::HTTPStatus(status) => *status,
3116
3117 ErrorType::InternalError => {
3119 let error_str = e.to_string();
3121 if error_str.contains("upstream")
3122 || error_str.contains("DNS")
3123 || error_str.contains("resolve")
3124 {
3125 502
3126 } else {
3127 500
3128 }
3129 }
3130
3131 _ => 502,
3133 };
3134
3135 error!(
3136 correlation_id = %ctx.trace_id,
3137 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3138 upstream = ctx.upstream.as_deref().unwrap_or("unknown"),
3139 error_type = ?e.etype(),
3140 error = %e,
3141 error_code = error_code,
3142 "Proxy error occurred"
3143 );
3144
3145 self.metrics
3147 .record_blocked_request(&format!("proxy_error_{}", error_code));
3148
3149 let error_message = match error_code {
3153 400 => "Bad Request",
3154 502 => "Bad Gateway",
3155 503 => "Service Unavailable",
3156 504 => "Gateway Timeout",
3157 _ => "Internal Server Error",
3158 };
3159
3160 let body = format!(
3162 r#"{{"error":"{} {}","trace_id":"{}"}}"#,
3163 error_code, error_message, ctx.trace_id
3164 );
3165
3166 let mut header = pingora::http::ResponseHeader::build(error_code, None).unwrap();
3168 header
3169 .insert_header("Content-Type", "application/json")
3170 .ok();
3171 header
3172 .insert_header("Content-Length", body.len().to_string())
3173 .ok();
3174 header
3175 .insert_header("X-Correlation-Id", ctx.trace_id.as_str())
3176 .ok();
3177 header.insert_header("Connection", "close").ok();
3178
3179 if let Err(write_err) = session.write_response_header(Box::new(header), false).await {
3181 warn!(
3182 correlation_id = %ctx.trace_id,
3183 error = %write_err,
3184 "Failed to write error response header"
3185 );
3186 } else {
3187 if let Err(write_err) = session
3189 .write_response_body(Some(bytes::Bytes::from(body)), true)
3190 .await
3191 {
3192 warn!(
3193 correlation_id = %ctx.trace_id,
3194 error = %write_err,
3195 "Failed to write error response body"
3196 );
3197 }
3198 }
3199
3200 pingora_proxy::FailToProxy {
3203 error_code,
3204 can_reuse_downstream: false,
3205 }
3206 }
3207
3208 fn error_while_proxy(
3214 &self,
3215 peer: &HttpPeer,
3216 session: &mut Session,
3217 e: Box<Error>,
3218 ctx: &mut Self::CTX,
3219 client_reused: bool,
3220 ) -> Box<Error> {
3221 let error_type = e.etype().clone();
3222 let upstream_id = ctx.upstream.as_deref().unwrap_or("unknown");
3223
3224 let is_retryable = matches!(
3226 error_type,
3227 ErrorType::ConnectTimedout
3228 | ErrorType::ReadTimedout
3229 | ErrorType::WriteTimedout
3230 | ErrorType::ConnectionClosed
3231 | ErrorType::ConnectRefused
3232 );
3233
3234 warn!(
3236 correlation_id = %ctx.trace_id,
3237 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3238 upstream = %upstream_id,
3239 peer_address = %peer.address(),
3240 error_type = ?error_type,
3241 error = %e,
3242 client_reused = client_reused,
3243 is_retryable = is_retryable,
3244 "Error during proxy operation"
3245 );
3246
3247 let peer_address = peer.address().to_string();
3250 let upstream_pools = self.upstream_pools.clone();
3251 let upstream_id_owned = upstream_id.to_string();
3252 tokio::spawn(async move {
3253 if let Some(pool) = upstream_pools.get(&upstream_id_owned).await {
3254 pool.report_result(&peer_address, false).await;
3255 }
3256 });
3257
3258 self.metrics
3260 .record_blocked_request(&format!("proxy_error_{:?}", error_type));
3261
3262 let mut enhanced_error = e.more_context(format!(
3264 "Upstream: {}, Peer: {}, Attempts: {}",
3265 upstream_id,
3266 peer.address(),
3267 ctx.upstream_attempts
3268 ));
3269
3270 if is_retryable {
3275 let can_retry = if client_reused {
3276 !session.as_ref().retry_buffer_truncated()
3278 } else {
3279 true
3281 };
3282
3283 enhanced_error.retry.decide_reuse(can_retry);
3284
3285 if can_retry {
3286 debug!(
3287 correlation_id = %ctx.trace_id,
3288 upstream = %upstream_id,
3289 error_type = ?error_type,
3290 "Error is retryable, will attempt retry"
3291 );
3292 }
3293 } else {
3294 enhanced_error.retry.decide_reuse(false);
3296 }
3297
3298 enhanced_error
3299 }
3300
3301 async fn logging(&self, session: &mut Session, _error: Option<&Error>, ctx: &mut Self::CTX) {
3302 self.reload_coordinator.dec_requests();
3304
3305 if !ctx.shadow_sent {
3307 if let Some(shadow_pending) = ctx.shadow_pending.take() {
3308 let body = if shadow_pending.include_body && !ctx.body_buffer.is_empty() {
3309 Some(ctx.body_buffer.clone())
3311 } else {
3312 None
3313 };
3314
3315 trace!(
3316 correlation_id = %ctx.trace_id,
3317 body_size = body.as_ref().map(|b| b.len()).unwrap_or(0),
3318 "Firing deferred shadow request with buffered body"
3319 );
3320
3321 shadow_pending.manager.shadow_request(
3322 shadow_pending.headers,
3323 body,
3324 shadow_pending.request_ctx,
3325 );
3326 ctx.shadow_sent = true;
3327 }
3328 }
3329
3330 let duration = ctx.elapsed();
3331
3332 let status = session
3334 .response_written()
3335 .map(|r| r.status.as_u16())
3336 .unwrap_or(0);
3337
3338 if let (Some(ref peer_addr), Some(ref upstream_id)) =
3341 (&ctx.selected_upstream_address, &ctx.upstream)
3342 {
3343 let success = status > 0 && status < 500;
3345
3346 if let Some(pool) = self.upstream_pools.get(upstream_id).await {
3347 pool.report_result_with_latency(peer_addr, success, Some(duration))
3348 .await;
3349 trace!(
3350 correlation_id = %ctx.trace_id,
3351 upstream = %upstream_id,
3352 peer_address = %peer_addr,
3353 success = success,
3354 duration_ms = duration.as_millis(),
3355 status = status,
3356 "Reported result to adaptive load balancer"
3357 );
3358 }
3359
3360 if ctx.inference_rate_limit_enabled && success {
3362 let cold_detected = self.warmth_tracker.record_request(peer_addr, duration);
3363 if cold_detected {
3364 debug!(
3365 correlation_id = %ctx.trace_id,
3366 upstream = %upstream_id,
3367 peer_address = %peer_addr,
3368 duration_ms = duration.as_millis(),
3369 "Cold model detected on inference upstream"
3370 );
3371 }
3372 }
3373 }
3374
3375 if ctx.inference_rate_limit_enabled {
3378 if let (Some(route_id), Some(ref rate_limit_key)) =
3379 (ctx.route_id.as_deref(), &ctx.inference_rate_limit_key)
3380 {
3381 let response_headers = session
3383 .response_written()
3384 .map(|r| &r.headers)
3385 .cloned()
3386 .unwrap_or_default();
3387
3388 let streaming_result = if ctx.inference_streaming_response {
3390 ctx.inference_streaming_counter
3391 .as_ref()
3392 .map(|counter| counter.finalize())
3393 } else {
3394 None
3395 };
3396
3397 if let Some(ref result) = streaming_result {
3399 debug!(
3400 correlation_id = %ctx.trace_id,
3401 output_tokens = result.output_tokens,
3402 input_tokens = ?result.input_tokens,
3403 source = ?result.source,
3404 content_length = result.content_length,
3405 "Finalized streaming token count"
3406 );
3407 }
3408
3409 if ctx.inference_streaming_response {
3411 if let Some(ref route_config) = ctx.route_config {
3412 if let Some(ref inference) = route_config.inference {
3413 if let Some(ref guardrails) = inference.guardrails {
3414 if let Some(ref pii_config) = guardrails.pii_detection {
3415 if pii_config.enabled {
3416 if let Some(ref counter) = ctx.inference_streaming_counter {
3418 let response_content = counter.content();
3419 if !response_content.is_empty() {
3420 let pii_result = self
3421 .guardrail_processor
3422 .check_pii(
3423 pii_config,
3424 response_content,
3425 ctx.route_id.as_deref(),
3426 &ctx.trace_id,
3427 )
3428 .await;
3429
3430 match pii_result {
3431 crate::inference::PiiCheckResult::Detected {
3432 detections,
3433 redacted_content: _,
3434 } => {
3435 warn!(
3436 correlation_id = %ctx.trace_id,
3437 route_id = ctx.route_id.as_deref().unwrap_or("unknown"),
3438 detection_count = detections.len(),
3439 "PII detected in inference response"
3440 );
3441
3442 ctx.pii_detection_categories = detections
3444 .iter()
3445 .map(|d| d.category.clone())
3446 .collect();
3447
3448 for detection in &detections {
3450 self.metrics.record_pii_detected(
3451 ctx.route_id.as_deref().unwrap_or("unknown"),
3452 &detection.category,
3453 );
3454 }
3455 }
3456 crate::inference::PiiCheckResult::Clean => {
3457 trace!(
3458 correlation_id = %ctx.trace_id,
3459 "No PII detected in response"
3460 );
3461 }
3462 crate::inference::PiiCheckResult::Error { message } => {
3463 debug!(
3464 correlation_id = %ctx.trace_id,
3465 error = %message,
3466 "PII detection check failed"
3467 );
3468 }
3469 }
3470 }
3471 }
3472 }
3473 }
3474 }
3475 }
3476 }
3477 }
3478
3479 let empty_body: &[u8] = &[];
3483
3484 if let Some(actual_estimate) = self.inference_rate_limit_manager.record_actual(
3485 route_id,
3486 rate_limit_key,
3487 &response_headers,
3488 empty_body,
3489 ctx.inference_estimated_tokens,
3490 ) {
3491 let (actual_tokens, source_info) = if let Some(ref streaming) = streaming_result
3493 {
3494 if streaming.total_tokens.is_some() {
3496 (streaming.total_tokens.unwrap(), "streaming_api")
3497 } else if actual_estimate.source == crate::inference::TokenSource::Estimated
3498 {
3499 let total = ctx.inference_input_tokens + streaming.output_tokens;
3502 (total, "streaming_tiktoken")
3503 } else {
3504 (actual_estimate.tokens, "headers")
3505 }
3506 } else {
3507 (actual_estimate.tokens, "headers")
3508 };
3509
3510 ctx.inference_actual_tokens = Some(actual_tokens);
3511
3512 debug!(
3513 correlation_id = %ctx.trace_id,
3514 route_id = route_id,
3515 estimated_tokens = ctx.inference_estimated_tokens,
3516 actual_tokens = actual_tokens,
3517 source = source_info,
3518 streaming_response = ctx.inference_streaming_response,
3519 model = ?ctx.inference_model,
3520 "Recorded actual inference tokens"
3521 );
3522
3523 if ctx.inference_budget_enabled {
3525 let alerts = self.inference_rate_limit_manager.record_budget(
3526 route_id,
3527 rate_limit_key,
3528 actual_tokens,
3529 );
3530
3531 for alert in alerts.iter() {
3533 warn!(
3534 correlation_id = %ctx.trace_id,
3535 route_id = route_id,
3536 tenant = %alert.tenant,
3537 threshold_pct = alert.threshold * 100.0,
3538 tokens_used = alert.tokens_used,
3539 tokens_limit = alert.tokens_limit,
3540 "Token budget alert threshold crossed"
3541 );
3542 }
3543
3544 if let Some(status) = self
3546 .inference_rate_limit_manager
3547 .budget_status(route_id, rate_limit_key)
3548 {
3549 ctx.inference_budget_remaining = Some(status.tokens_remaining as i64);
3550 }
3551 }
3552
3553 if ctx.inference_cost_enabled {
3555 if let Some(model) = ctx.inference_model.as_deref() {
3556 let (input_tokens, output_tokens) = if let Some(ref streaming) =
3558 streaming_result
3559 {
3560 let input =
3562 streaming.input_tokens.unwrap_or(ctx.inference_input_tokens);
3563 let output = streaming.output_tokens;
3564 (input, output)
3565 } else {
3566 let input = ctx.inference_input_tokens;
3568 let output = actual_tokens.saturating_sub(input);
3569 (input, output)
3570 };
3571 ctx.inference_output_tokens = output_tokens;
3572
3573 if let Some(cost_result) = self
3574 .inference_rate_limit_manager
3575 .calculate_cost(route_id, model, input_tokens, output_tokens)
3576 {
3577 ctx.inference_request_cost = Some(cost_result.total_cost);
3578
3579 trace!(
3580 correlation_id = %ctx.trace_id,
3581 route_id = route_id,
3582 model = model,
3583 input_tokens = input_tokens,
3584 output_tokens = output_tokens,
3585 total_cost = cost_result.total_cost,
3586 currency = %cost_result.currency,
3587 "Calculated inference request cost"
3588 );
3589 }
3590 }
3591 }
3592 }
3593 }
3594 }
3595
3596 if self.log_manager.should_log_access(status) {
3598 let access_entry = AccessLogEntry {
3599 timestamp: chrono::Utc::now().to_rfc3339(),
3600 trace_id: ctx.trace_id.clone(),
3601 method: ctx.method.clone(),
3602 path: ctx.path.clone(),
3603 query: ctx.query.clone(),
3604 protocol: "HTTP/1.1".to_string(),
3605 status,
3606 body_bytes: ctx.response_bytes,
3607 duration_ms: duration.as_millis() as u64,
3608 client_ip: ctx.client_ip.clone(),
3609 user_agent: ctx.user_agent.clone(),
3610 referer: ctx.referer.clone(),
3611 host: ctx.host.clone(),
3612 route_id: ctx.route_id.clone(),
3613 upstream: ctx.upstream.clone(),
3614 upstream_attempts: ctx.upstream_attempts,
3615 instance_id: self.app_state.instance_id.clone(),
3616 namespace: ctx.namespace.clone(),
3617 service: ctx.service.clone(),
3618 body_bytes_sent: ctx.response_bytes,
3620 upstream_addr: ctx.selected_upstream_address.clone(),
3621 connection_reused: ctx.connection_reused,
3622 rate_limit_hit: status == 429,
3623 geo_country: ctx.geo_country_code.clone(),
3624 };
3625 self.log_manager.log_access(&access_entry);
3626 }
3627
3628 if tracing::enabled!(tracing::Level::DEBUG) {
3630 debug!(
3631 trace_id = %ctx.trace_id,
3632 method = %ctx.method,
3633 path = %ctx.path,
3634 route_id = ?ctx.route_id,
3635 upstream = ?ctx.upstream,
3636 status = status,
3637 duration_ms = duration.as_millis() as u64,
3638 upstream_attempts = ctx.upstream_attempts,
3639 error = ?_error.map(|e| e.to_string()),
3640 "Request completed"
3641 );
3642 }
3643
3644 if ctx.is_websocket_upgrade && status == 101 {
3646 info!(
3647 trace_id = %ctx.trace_id,
3648 route_id = ?ctx.route_id,
3649 upstream = ?ctx.upstream,
3650 client_ip = %ctx.client_ip,
3651 "WebSocket connection established"
3652 );
3653 }
3654
3655 if let Some(span) = ctx.otel_span.take() {
3657 span.end();
3658 }
3659 }
3660}
3661
3662impl GrapsusProxy {
3667 async fn process_body_chunk_streaming(
3669 &self,
3670 body: &mut Option<Bytes>,
3671 end_of_stream: bool,
3672 ctx: &mut RequestContext,
3673 ) -> Result<(), Box<Error>> {
3674 let chunk_data: Vec<u8> = body.as_ref().map(|b| b.to_vec()).unwrap_or_default();
3676 let chunk_index = ctx.request_body_chunk_index;
3677 ctx.request_body_chunk_index += 1;
3678 ctx.body_bytes_inspected += chunk_data.len() as u64;
3679
3680 debug!(
3681 correlation_id = %ctx.trace_id,
3682 chunk_index = chunk_index,
3683 chunk_size = chunk_data.len(),
3684 end_of_stream = end_of_stream,
3685 "Streaming body chunk to agents"
3686 );
3687
3688 let agent_ctx = crate::agents::AgentCallContext {
3690 correlation_id: grapsus_common::CorrelationId::from_string(&ctx.trace_id),
3691 metadata: grapsus_agent_protocol::RequestMetadata {
3692 correlation_id: ctx.trace_id.clone(),
3693 request_id: ctx.trace_id.clone(),
3694 client_ip: ctx.client_ip.clone(),
3695 client_port: 0,
3696 server_name: ctx.host.clone(),
3697 protocol: "HTTP/1.1".to_string(),
3698 tls_version: None,
3699 tls_cipher: None,
3700 route_id: ctx.route_id.clone(),
3701 upstream_id: ctx.upstream.clone(),
3702 timestamp: chrono::Utc::now().to_rfc3339(),
3703 traceparent: ctx.traceparent(),
3704 },
3705 route_id: ctx.route_id.clone(),
3706 upstream_id: ctx.upstream.clone(),
3707 request_body: None, response_body: None,
3709 };
3710
3711 let agent_ids = ctx.body_inspection_agents.clone();
3712 let total_size = None; match self
3715 .agent_manager
3716 .process_request_body_streaming(
3717 &agent_ctx,
3718 &chunk_data,
3719 end_of_stream,
3720 chunk_index,
3721 ctx.body_bytes_inspected as usize,
3722 total_size,
3723 &agent_ids,
3724 )
3725 .await
3726 {
3727 Ok(decision) => {
3728 ctx.agent_needs_more = decision.needs_more;
3730
3731 if let Some(ref mutation) = decision.request_body_mutation {
3733 if !mutation.is_pass_through() {
3734 if mutation.is_drop() {
3735 *body = None;
3737 trace!(
3738 correlation_id = %ctx.trace_id,
3739 chunk_index = chunk_index,
3740 "Agent dropped body chunk"
3741 );
3742 } else if let Some(ref new_data) = mutation.data {
3743 *body = Some(Bytes::from(new_data.clone()));
3745 trace!(
3746 correlation_id = %ctx.trace_id,
3747 chunk_index = chunk_index,
3748 original_size = chunk_data.len(),
3749 new_size = new_data.len(),
3750 "Agent mutated body chunk"
3751 );
3752 }
3753 }
3754 }
3755
3756 if !decision.needs_more && !decision.is_allow() {
3758 warn!(
3759 correlation_id = %ctx.trace_id,
3760 action = ?decision.action,
3761 "Agent blocked request body"
3762 );
3763 self.metrics.record_blocked_request("agent_body_inspection");
3764
3765 let (status, message) = match &decision.action {
3766 crate::agents::AgentAction::Block { status, body, .. } => (
3767 *status,
3768 body.clone().unwrap_or_else(|| "Blocked".to_string()),
3769 ),
3770 _ => (403, "Forbidden".to_string()),
3771 };
3772
3773 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3774 }
3775
3776 trace!(
3777 correlation_id = %ctx.trace_id,
3778 needs_more = decision.needs_more,
3779 "Agent processed body chunk"
3780 );
3781 }
3782 Err(e) => {
3783 let fail_closed = ctx
3784 .route_config
3785 .as_ref()
3786 .map(|r| r.policies.failure_mode == grapsus_config::FailureMode::Closed)
3787 .unwrap_or(false);
3788
3789 if fail_closed {
3790 error!(
3791 correlation_id = %ctx.trace_id,
3792 error = %e,
3793 "Agent streaming body inspection failed, blocking (fail-closed)"
3794 );
3795 self.log_manager.log_request_error(
3796 "error",
3797 "Agent streaming body inspection failed, blocking (fail-closed)",
3798 &ctx.trace_id,
3799 ctx.route_id.as_deref(),
3800 ctx.upstream.as_deref(),
3801 Some(format!("error={}", e)),
3802 );
3803 return Err(Error::explain(
3804 ErrorType::HTTPStatus(503),
3805 "Service unavailable",
3806 ));
3807 } else {
3808 warn!(
3809 correlation_id = %ctx.trace_id,
3810 error = %e,
3811 "Agent streaming body inspection failed, allowing (fail-open)"
3812 );
3813 self.log_manager.log_request_error(
3814 "warn",
3815 "Agent streaming body inspection failed, allowing (fail-open)",
3816 &ctx.trace_id,
3817 ctx.route_id.as_deref(),
3818 ctx.upstream.as_deref(),
3819 Some(format!("error={}", e)),
3820 );
3821 }
3822 }
3823 }
3824
3825 Ok(())
3826 }
3827
3828 async fn send_buffered_body_to_agents(
3830 &self,
3831 end_of_stream: bool,
3832 ctx: &mut RequestContext,
3833 ) -> Result<(), Box<Error>> {
3834 debug!(
3835 correlation_id = %ctx.trace_id,
3836 buffer_size = ctx.body_buffer.len(),
3837 end_of_stream = end_of_stream,
3838 agent_count = ctx.body_inspection_agents.len(),
3839 decompression_enabled = ctx.decompression_enabled,
3840 "Sending buffered body to agents for inspection"
3841 );
3842
3843 let body_for_inspection = if ctx.decompression_enabled {
3845 if let Some(ref encoding) = ctx.body_content_encoding {
3846 let config = crate::decompression::DecompressionConfig {
3847 max_ratio: ctx.max_decompression_ratio,
3848 max_output_bytes: ctx.max_decompression_bytes,
3849 };
3850
3851 match crate::decompression::decompress_body(&ctx.body_buffer, encoding, &config) {
3852 Ok(result) => {
3853 ctx.body_was_decompressed = true;
3854 self.metrics
3855 .record_decompression_success(encoding, result.ratio);
3856 debug!(
3857 correlation_id = %ctx.trace_id,
3858 encoding = %encoding,
3859 compressed_size = result.compressed_size,
3860 decompressed_size = result.decompressed_size,
3861 ratio = result.ratio,
3862 "Body decompressed for agent inspection"
3863 );
3864 result.data
3865 }
3866 Err(e) => {
3867 let failure_reason = match &e {
3869 crate::decompression::DecompressionError::RatioExceeded { .. } => {
3870 "ratio_exceeded"
3871 }
3872 crate::decompression::DecompressionError::SizeExceeded { .. } => {
3873 "size_exceeded"
3874 }
3875 crate::decompression::DecompressionError::InvalidData { .. } => {
3876 "invalid_data"
3877 }
3878 crate::decompression::DecompressionError::UnsupportedEncoding {
3879 ..
3880 } => "unsupported",
3881 crate::decompression::DecompressionError::IoError(_) => "io_error",
3882 };
3883 self.metrics
3884 .record_decompression_failure(encoding, failure_reason);
3885
3886 let fail_closed = ctx
3888 .route_config
3889 .as_ref()
3890 .map(|r| {
3891 r.policies.failure_mode == grapsus_config::FailureMode::Closed
3892 })
3893 .unwrap_or(false);
3894
3895 if fail_closed {
3896 error!(
3897 correlation_id = %ctx.trace_id,
3898 error = %e,
3899 encoding = %encoding,
3900 "Decompression failed, blocking (fail-closed)"
3901 );
3902 return Err(Error::explain(
3903 ErrorType::HTTPStatus(400),
3904 "Invalid compressed body",
3905 ));
3906 } else {
3907 warn!(
3908 correlation_id = %ctx.trace_id,
3909 error = %e,
3910 encoding = %encoding,
3911 "Decompression failed, sending compressed body (fail-open)"
3912 );
3913 ctx.body_buffer.clone()
3914 }
3915 }
3916 }
3917 } else {
3918 ctx.body_buffer.clone()
3919 }
3920 } else {
3921 ctx.body_buffer.clone()
3922 };
3923
3924 let agent_ctx = crate::agents::AgentCallContext {
3925 correlation_id: grapsus_common::CorrelationId::from_string(&ctx.trace_id),
3926 metadata: grapsus_agent_protocol::RequestMetadata {
3927 correlation_id: ctx.trace_id.clone(),
3928 request_id: ctx.trace_id.clone(),
3929 client_ip: ctx.client_ip.clone(),
3930 client_port: 0,
3931 server_name: ctx.host.clone(),
3932 protocol: "HTTP/1.1".to_string(),
3933 tls_version: None,
3934 tls_cipher: None,
3935 route_id: ctx.route_id.clone(),
3936 upstream_id: ctx.upstream.clone(),
3937 timestamp: chrono::Utc::now().to_rfc3339(),
3938 traceparent: ctx.traceparent(),
3939 },
3940 route_id: ctx.route_id.clone(),
3941 upstream_id: ctx.upstream.clone(),
3942 request_body: Some(body_for_inspection.clone()),
3943 response_body: None,
3944 };
3945
3946 let agent_ids = ctx.body_inspection_agents.clone();
3947 match self
3948 .agent_manager
3949 .process_request_body(&agent_ctx, &body_for_inspection, end_of_stream, &agent_ids)
3950 .await
3951 {
3952 Ok(decision) => {
3953 if !decision.is_allow() {
3954 warn!(
3955 correlation_id = %ctx.trace_id,
3956 action = ?decision.action,
3957 "Agent blocked request body"
3958 );
3959 self.metrics.record_blocked_request("agent_body_inspection");
3960
3961 let (status, message) = match &decision.action {
3962 crate::agents::AgentAction::Block { status, body, .. } => (
3963 *status,
3964 body.clone().unwrap_or_else(|| "Blocked".to_string()),
3965 ),
3966 _ => (403, "Forbidden".to_string()),
3967 };
3968
3969 return Err(Error::explain(ErrorType::HTTPStatus(status), message));
3970 }
3971
3972 trace!(
3973 correlation_id = %ctx.trace_id,
3974 "Agent allowed request body"
3975 );
3976 }
3977 Err(e) => {
3978 let fail_closed = ctx
3979 .route_config
3980 .as_ref()
3981 .map(|r| r.policies.failure_mode == grapsus_config::FailureMode::Closed)
3982 .unwrap_or(false);
3983
3984 if fail_closed {
3985 error!(
3986 correlation_id = %ctx.trace_id,
3987 error = %e,
3988 "Agent body inspection failed, blocking (fail-closed)"
3989 );
3990 self.log_manager.log_request_error(
3991 "error",
3992 "Agent body inspection failed, blocking (fail-closed)",
3993 &ctx.trace_id,
3994 ctx.route_id.as_deref(),
3995 ctx.upstream.as_deref(),
3996 Some(format!("error={}", e)),
3997 );
3998 return Err(Error::explain(
3999 ErrorType::HTTPStatus(503),
4000 "Service unavailable",
4001 ));
4002 } else {
4003 warn!(
4004 correlation_id = %ctx.trace_id,
4005 error = %e,
4006 "Agent body inspection failed, allowing (fail-open)"
4007 );
4008 self.log_manager.log_request_error(
4009 "warn",
4010 "Agent body inspection failed, allowing (fail-open)",
4011 &ctx.trace_id,
4012 ctx.route_id.as_deref(),
4013 ctx.upstream.as_deref(),
4014 Some(format!("error={}", e)),
4015 );
4016 }
4017 }
4018 }
4019
4020 Ok(())
4021 }
4022}