1use bytes::Bytes;
7use http::{Request, Response};
8use http_body::Body as HttpBody;
9use std::{
10 future::Future,
11 pin::Pin,
12 task::{Context, Poll},
13};
14use tower_service::Service;
15
16use crate::body::DebugBody;
17use crate::layer::DebugConfig;
18
19#[derive(Debug, Clone)]
21pub struct DebugService<S> {
22 inner: S,
23 config: DebugConfig,
24}
25
26impl<S> DebugService<S> {
27 pub fn new(inner: S, config: DebugConfig) -> Self {
29 Self { inner, config }
30 }
31}
32
33impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for DebugService<S>
34where
35 S: Service<Request<ReqBody>, Response = Response<ResBody>> + Clone + Send + 'static,
36 S::Future: Send + 'static,
37 S::Error: std::fmt::Display + Send + 'static,
38 ReqBody: HttpBody<Data = Bytes> + Send + 'static,
39 ReqBody::Error: std::fmt::Display,
40 ResBody: HttpBody<Data = Bytes> + Send + 'static,
41 ResBody::Error: std::fmt::Display + Send,
42{
43 type Response = Response<DebugBody<ResBody>>;
44 type Error = S::Error;
45 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
46
47 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
48 self.inner.poll_ready(cx)
49 }
50
51 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
52 let config = self.config.clone();
53 let mut inner = self.inner.clone();
54 std::mem::swap(&mut self.inner, &mut inner);
56
57 Box::pin(async move {
58 let start = tokio::time::Instant::now();
59
60 let method = req.uri().path().to_string();
62 let http_method = req.method().clone();
63
64 tracing::info!(
66 method = %method,
67 http_method = %http_method,
68 "→ gRPC request"
69 );
70
71 if config.log_headers {
72 let headers = req.headers();
73 let content_type = headers
74 .get("content-type")
75 .and_then(|v| v.to_str().ok())
76 .unwrap_or("unknown");
77 let authority = req
78 .uri()
79 .authority()
80 .map(|a| a.to_string())
81 .unwrap_or_default();
82 let user_agent = headers
83 .get("user-agent")
84 .and_then(|v| v.to_str().ok())
85 .unwrap_or("unknown");
86 let grpc_timeout = headers.get("grpc-timeout").and_then(|v| v.to_str().ok());
87
88 let custom_metadata: Vec<_> = headers
90 .iter()
91 .filter(|(name, _)| {
92 let n = name.as_str();
93 !n.starts_with(':')
94 && n != "content-type"
95 && n != "user-agent"
96 && n != "te"
97 && n != "grpc-timeout"
98 && n != "grpc-encoding"
99 && n != "grpc-accept-encoding"
100 })
101 .map(|(name, value)| {
102 if !config.reveal_sensitive_headers
103 && config.sensitive_headers.contains(name)
104 {
105 format!("{}=[REDACTED]", name.as_str())
106 } else {
107 format!("{}={}", name.as_str(), value.to_str().unwrap_or("<binary>"))
108 }
109 })
110 .collect();
111
112 if !custom_metadata.is_empty() {
113 tracing::debug!(
114 method = %method,
115 metadata = ?custom_metadata,
116 content_type = content_type,
117 authority = %authority,
118 user_agent = user_agent,
119 grpc_timeout = ?grpc_timeout,
120 "→ gRPC request headers"
121 );
122 } else {
123 tracing::debug!(
124 method = %method,
125 content_type = content_type,
126 authority = %authority,
127 user_agent = user_agent,
128 grpc_timeout = ?grpc_timeout,
129 "→ gRPC request headers"
130 );
131
132 }
133 }
134
135 let response = inner.call(req).await;
137
138 let elapsed = start.elapsed();
139
140 match response {
141 Ok(resp) => {
142 let status = resp.status();
143 let grpc_status = resp
144 .headers()
145 .get("grpc-status")
146 .and_then(|v| v.to_str().ok())
147 .map(String::from);
148
149 if let Some(ref gs) = grpc_status {
150 if gs != "0" {
151 let grpc_message = resp
152 .headers()
153 .get("grpc-message")
154 .and_then(|v| v.to_str().ok())
155 .unwrap_or("");
156 tracing::warn!(
157 method = %method,
158 http_status = %status,
159 grpc_status = %gs,
160 grpc_message = grpc_message,
161 elapsed_ms = elapsed.as_millis() as u64,
162 "← gRPC response (error)"
163 );
164 } else {
165 tracing::info!(
166 method = %method,
167 http_status = %status,
168 grpc_status = %gs,
169 elapsed_ms = elapsed.as_millis() as u64,
170 "← gRPC response"
171 );
172 }
173 } else {
174 tracing::info!(
176 method = %method,
177 http_status = %status,
178 elapsed_ms = elapsed.as_millis() as u64,
179 "← gRPC response (status in trailers)"
180 );
181 }
182
183 if config.log_headers {
184 tracing::debug!(
185 method = %method,
186 headers = ?resp.headers(),
187 "← gRPC response headers"
188 );
189 }
190
191 let (parts, body) = resp.into_parts();
193 let debug_body = DebugBody::new(
194 body,
195 method,
196 config.log_response_frames,
197 config.max_body_bytes,
198 );
199 Ok(Response::from_parts(parts, debug_body))
200 }
201 Err(e) => {
202 tracing::error!(
203 method = %method,
204 elapsed_ms = elapsed.as_millis() as u64,
205 error = %e,
206 "← gRPC call failed"
207 );
208 Err(e)
209 }
210 }
211 })
212 }
213}