Skip to main content

tonic_debug/
service.rs

1//! Tower Service implementation for gRPC request/response interception.
2//!
3//! [`DebugService`] intercepts every gRPC call, logs request metadata and
4//! body contents, forwards to the inner service, and then logs the response.
5
6use bytes::Bytes;
7use http::{Request, Response};
8use http_body::Body as HttpBody;
9use std::{
10    future::Future,
11    pin::Pin,
12    task::{Context, Poll},
13};
14use tower_service::Service;
15
16use crate::body::DebugBody;
17use crate::layer::DebugConfig;
18
19/// A Tower service that intercepts and logs gRPC requests and responses.
20#[derive(Debug, Clone)]
21pub struct DebugService<S> {
22    inner: S,
23    config: DebugConfig,
24}
25
26impl<S> DebugService<S> {
27    /// Create a new `DebugService` wrapping the given service.
28    pub fn new(inner: S, config: DebugConfig) -> Self {
29        Self { inner, config }
30    }
31}
32
33impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for DebugService<S>
34where
35    S: Service<Request<ReqBody>, Response = Response<ResBody>> + Clone + Send + 'static,
36    S::Future: Send + 'static,
37    S::Error: std::fmt::Display + Send + 'static,
38    ReqBody: HttpBody<Data = Bytes> + Send + 'static,
39    ReqBody::Error: std::fmt::Display,
40    ResBody: HttpBody<Data = Bytes> + Send + 'static,
41    ResBody::Error: std::fmt::Display + Send,
42{
43    type Response = Response<DebugBody<ResBody>>;
44    type Error = S::Error;
45    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
46
47    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
48        self.inner.poll_ready(cx)
49    }
50
51    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
52        let config = self.config.clone();
53        let mut inner = self.inner.clone();
54        // Per tower best practice: swap the clone so the ready state is preserved.
55        std::mem::swap(&mut self.inner, &mut inner);
56
57        Box::pin(async move {
58            let start = tokio::time::Instant::now();
59
60            // Extract gRPC method path from the URI
61            let method = req.uri().path().to_string();
62            let http_method = req.method().clone();
63
64            // Log request metadata
65            tracing::info!(
66                method = %method,
67                http_method = %http_method,
68                "→ gRPC request"
69            );
70
71            if config.log_headers {
72                let headers = req.headers();
73                let content_type = headers
74                    .get("content-type")
75                    .and_then(|v| v.to_str().ok())
76                    .unwrap_or("unknown");
77                let authority = req
78                    .uri()
79                    .authority()
80                    .map(|a| a.to_string())
81                    .unwrap_or_default();
82                let user_agent = headers
83                    .get("user-agent")
84                    .and_then(|v| v.to_str().ok())
85                    .unwrap_or("unknown");
86                let grpc_timeout = headers.get("grpc-timeout").and_then(|v| v.to_str().ok());
87
88                // Log custom metadata (headers not starting with standard prefixes)
89                let custom_metadata: Vec<_> = headers
90                    .iter()
91                    .filter(|(name, _)| {
92                        let n = name.as_str();
93                        !n.starts_with(':')
94                            && n != "content-type"
95                            && n != "user-agent"
96                            && n != "te"
97                            && n != "grpc-timeout"
98                            && n != "grpc-encoding"
99                            && n != "grpc-accept-encoding"
100                    })
101                    .map(|(name, value)| {
102                        if !config.reveal_sensitive_headers
103                            && config.sensitive_headers.contains(name)
104                        {
105                            format!("{}=[REDACTED]", name.as_str())
106                        } else {
107                            format!("{}={}", name.as_str(), value.to_str().unwrap_or("<binary>"))
108                        }
109                    })
110                    .collect();
111
112                if !custom_metadata.is_empty() {
113                    tracing::debug!(
114                        method = %method,
115                        metadata = ?custom_metadata,
116                        content_type = content_type,
117                        authority = %authority,
118                        user_agent = user_agent,
119                        grpc_timeout = ?grpc_timeout,
120                        "→ gRPC request headers"
121                    );
122                } else {
123                    tracing::debug!(
124                        method = %method,
125                        content_type = content_type,
126                        authority = %authority,
127                        user_agent = user_agent,
128                        grpc_timeout = ?grpc_timeout,
129                        "→ gRPC request headers"
130                    );
131
132                }
133            }
134
135            // Call the inner service
136            let response = inner.call(req).await;
137
138            let elapsed = start.elapsed();
139
140            match response {
141                Ok(resp) => {
142                    let status = resp.status();
143                    let grpc_status = resp
144                        .headers()
145                        .get("grpc-status")
146                        .and_then(|v| v.to_str().ok())
147                        .map(String::from);
148
149                    if let Some(ref gs) = grpc_status {
150                        if gs != "0" {
151                            let grpc_message = resp
152                                .headers()
153                                .get("grpc-message")
154                                .and_then(|v| v.to_str().ok())
155                                .unwrap_or("");
156                            tracing::warn!(
157                                method = %method,
158                                http_status = %status,
159                                grpc_status = %gs,
160                                grpc_message = grpc_message,
161                                elapsed_ms = elapsed.as_millis() as u64,
162                                "← gRPC response (error)"
163                            );
164                        } else {
165                            tracing::info!(
166                                method = %method,
167                                http_status = %status,
168                                grpc_status = %gs,
169                                elapsed_ms = elapsed.as_millis() as u64,
170                                "← gRPC response"
171                            );
172                        }
173                    } else {
174                        // gRPC status will come in trailers
175                        tracing::info!(
176                            method = %method,
177                            http_status = %status,
178                            elapsed_ms = elapsed.as_millis() as u64,
179                            "← gRPC response (status in trailers)"
180                        );
181                    }
182
183                    if config.log_headers {
184                        tracing::debug!(
185                            method = %method,
186                            headers = ?resp.headers(),
187                            "← gRPC response headers"
188                        );
189                    }
190
191                    // Wrap the response body for frame-level logging
192                    let (parts, body) = resp.into_parts();
193                    let debug_body = DebugBody::new(
194                        body,
195                        method,
196                        config.log_response_frames,
197                        config.max_body_bytes,
198                    );
199                    Ok(Response::from_parts(parts, debug_body))
200                }
201                Err(e) => {
202                    tracing::error!(
203                        method = %method,
204                        elapsed_ms = elapsed.as_millis() as u64,
205                        error = %e,
206                        "← gRPC call failed"
207                    );
208                    Err(e)
209                }
210            }
211        })
212    }
213}