1use bytes::Bytes;
7use http::{Request, Response};
8use http_body::Body as HttpBody;
9use std::{
10 future::Future,
11 pin::Pin,
12 task::{Context, Poll},
13};
14use tower_service::Service;
15
16use crate::body::DebugBody;
17use crate::layer::DebugConfig;
18
19#[derive(Debug, Clone)]
21pub struct DebugService<S> {
22 inner: S,
23 config: DebugConfig,
24}
25
26impl<S> DebugService<S> {
27 pub fn new(inner: S, config: DebugConfig) -> Self {
29 Self { inner, config }
30 }
31}
32
33impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for DebugService<S>
34where
35 S: Service<Request<ReqBody>, Response = Response<ResBody>> + Clone + Send + 'static,
36 S::Future: Send + 'static,
37 S::Error: std::fmt::Display + Send + 'static,
38 ReqBody: HttpBody<Data = Bytes> + Send + 'static,
39 ReqBody::Error: std::fmt::Display,
40 ResBody: HttpBody<Data = Bytes> + Send + 'static,
41 ResBody::Error: std::fmt::Display + Send,
42{
43 type Response = Response<DebugBody<ResBody>>;
44 type Error = S::Error;
45 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
46
47 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
48 self.inner.poll_ready(cx)
49 }
50
51 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
52 let config = self.config.clone();
53 let mut inner = self.inner.clone();
54 std::mem::swap(&mut self.inner, &mut inner);
56
57 Box::pin(async move {
58 let start = tokio::time::Instant::now();
59
60 let method = req.uri().path().to_string();
62 let http_method = req.method().clone();
63
64 tracing::info!(
66 method = %method,
67 http_method = %http_method,
68 "→ gRPC request"
69 );
70
71 if config.log_headers {
72 let headers = req.headers();
73 let content_type = headers
74 .get("content-type")
75 .and_then(|v| v.to_str().ok())
76 .unwrap_or("unknown");
77 let authority = req
78 .uri()
79 .authority()
80 .map(|a| a.to_string())
81 .unwrap_or_default();
82 let user_agent = headers
83 .get("user-agent")
84 .and_then(|v| v.to_str().ok())
85 .unwrap_or("unknown");
86 let grpc_timeout = headers.get("grpc-timeout").and_then(|v| v.to_str().ok());
87
88 tracing::debug!(
89 method = %method,
90 content_type = content_type,
91 authority = %authority,
92 user_agent = user_agent,
93 grpc_timeout = ?grpc_timeout,
94 "→ gRPC request headers"
95 );
96
97 let custom_metadata: Vec<_> = headers
99 .iter()
100 .filter(|(name, _)| {
101 let n = name.as_str();
102 !n.starts_with(':')
103 && n != "content-type"
104 && n != "user-agent"
105 && n != "te"
106 && n != "grpc-timeout"
107 && n != "grpc-encoding"
108 && n != "grpc-accept-encoding"
109 })
110 .map(|(name, value)| {
111 format!("{}={}", name.as_str(), value.to_str().unwrap_or("<binary>"))
112 })
113 .collect();
114
115 if !custom_metadata.is_empty() {
116 tracing::debug!(
117 method = %method,
118 metadata = ?custom_metadata,
119 "→ gRPC custom metadata"
120 );
121 }
122 }
123
124 let response = inner.call(req).await;
126
127 let elapsed = start.elapsed();
128
129 match response {
130 Ok(resp) => {
131 let status = resp.status();
132 let grpc_status = resp
133 .headers()
134 .get("grpc-status")
135 .and_then(|v| v.to_str().ok())
136 .map(String::from);
137
138 if let Some(ref gs) = grpc_status {
139 if gs != "0" {
140 let grpc_message = resp
141 .headers()
142 .get("grpc-message")
143 .and_then(|v| v.to_str().ok())
144 .unwrap_or("");
145 tracing::warn!(
146 method = %method,
147 http_status = %status,
148 grpc_status = %gs,
149 grpc_message = grpc_message,
150 elapsed_ms = elapsed.as_millis() as u64,
151 "← gRPC response (error)"
152 );
153 } else {
154 tracing::info!(
155 method = %method,
156 http_status = %status,
157 grpc_status = %gs,
158 elapsed_ms = elapsed.as_millis() as u64,
159 "← gRPC response"
160 );
161 }
162 } else {
163 tracing::info!(
165 method = %method,
166 http_status = %status,
167 elapsed_ms = elapsed.as_millis() as u64,
168 "← gRPC response (status in trailers)"
169 );
170 }
171
172 if config.log_headers {
173 tracing::debug!(
174 method = %method,
175 headers = ?resp.headers(),
176 "← gRPC response headers"
177 );
178 }
179
180 let (parts, body) = resp.into_parts();
182 let debug_body = DebugBody::new(
183 body,
184 method,
185 config.log_response_frames,
186 config.max_body_bytes,
187 );
188 Ok(Response::from_parts(parts, debug_body))
189 }
190 Err(e) => {
191 tracing::error!(
192 method = %method,
193 elapsed_ms = elapsed.as_millis() as u64,
194 error = %e,
195 "← gRPC call failed"
196 );
197 Err(e)
198 }
199 }
200 })
201 }
202}