Skip to main content

tonic_debug/
service.rs

1//! Tower Service implementation for gRPC request/response interception.
2//!
3//! [`DebugService`] intercepts every gRPC call, logs request metadata and
4//! body contents, forwards to the inner service, and then logs the response.
5
6use bytes::Bytes;
7use http::{Request, Response};
8use http_body::Body as HttpBody;
9use std::{
10    future::Future,
11    pin::Pin,
12    task::{Context, Poll},
13};
14use tower_service::Service;
15
16use crate::body::DebugBody;
17use crate::layer::DebugConfig;
18
19/// A Tower service that intercepts and logs gRPC requests and responses.
20#[derive(Debug, Clone)]
21pub struct DebugService<S> {
22    inner: S,
23    config: DebugConfig,
24}
25
26impl<S> DebugService<S> {
27    /// Create a new `DebugService` wrapping the given service.
28    pub fn new(inner: S, config: DebugConfig) -> Self {
29        Self { inner, config }
30    }
31}
32
33impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for DebugService<S>
34where
35    S: Service<Request<ReqBody>, Response = Response<ResBody>> + Clone + Send + 'static,
36    S::Future: Send + 'static,
37    S::Error: std::fmt::Display + Send + 'static,
38    ReqBody: HttpBody<Data = Bytes> + Send + 'static,
39    ReqBody::Error: std::fmt::Display,
40    ResBody: HttpBody<Data = Bytes> + Send + 'static,
41    ResBody::Error: std::fmt::Display + Send,
42{
43    type Response = Response<DebugBody<ResBody>>;
44    type Error = S::Error;
45    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
46
47    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
48        self.inner.poll_ready(cx)
49    }
50
51    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
52        let config = self.config.clone();
53        let mut inner = self.inner.clone();
54        // Per tower best practice: swap the clone so the ready state is preserved.
55        std::mem::swap(&mut self.inner, &mut inner);
56
57        Box::pin(async move {
58            let start = tokio::time::Instant::now();
59
60            // Extract gRPC method path from the URI
61            let method = req.uri().path().to_string();
62            let http_method = req.method().clone();
63
64            // Log request metadata
65            tracing::info!(
66                method = %method,
67                http_method = %http_method,
68                "→ gRPC request"
69            );
70
71            if config.log_headers {
72                let headers = req.headers();
73                let content_type = headers
74                    .get("content-type")
75                    .and_then(|v| v.to_str().ok())
76                    .unwrap_or("unknown");
77                let authority = req
78                    .uri()
79                    .authority()
80                    .map(|a| a.to_string())
81                    .unwrap_or_default();
82                let user_agent = headers
83                    .get("user-agent")
84                    .and_then(|v| v.to_str().ok())
85                    .unwrap_or("unknown");
86                let grpc_timeout = headers.get("grpc-timeout").and_then(|v| v.to_str().ok());
87
88                tracing::debug!(
89                    method = %method,
90                    content_type = content_type,
91                    authority = %authority,
92                    user_agent = user_agent,
93                    grpc_timeout = ?grpc_timeout,
94                    "→ gRPC request headers"
95                );
96
97                // Log custom metadata (headers not starting with standard prefixes)
98                let custom_metadata: Vec<_> = headers
99                    .iter()
100                    .filter(|(name, _)| {
101                        let n = name.as_str();
102                        !n.starts_with(':')
103                            && n != "content-type"
104                            && n != "user-agent"
105                            && n != "te"
106                            && n != "grpc-timeout"
107                            && n != "grpc-encoding"
108                            && n != "grpc-accept-encoding"
109                    })
110                    .map(|(name, value)| {
111                        format!("{}={}", name.as_str(), value.to_str().unwrap_or("<binary>"))
112                    })
113                    .collect();
114
115                if !custom_metadata.is_empty() {
116                    tracing::debug!(
117                        method = %method,
118                        metadata = ?custom_metadata,
119                        "→ gRPC custom metadata"
120                    );
121                }
122            }
123
124            // Call the inner service
125            let response = inner.call(req).await;
126
127            let elapsed = start.elapsed();
128
129            match response {
130                Ok(resp) => {
131                    let status = resp.status();
132                    let grpc_status = resp
133                        .headers()
134                        .get("grpc-status")
135                        .and_then(|v| v.to_str().ok())
136                        .map(String::from);
137
138                    if let Some(ref gs) = grpc_status {
139                        if gs != "0" {
140                            let grpc_message = resp
141                                .headers()
142                                .get("grpc-message")
143                                .and_then(|v| v.to_str().ok())
144                                .unwrap_or("");
145                            tracing::warn!(
146                                method = %method,
147                                http_status = %status,
148                                grpc_status = %gs,
149                                grpc_message = grpc_message,
150                                elapsed_ms = elapsed.as_millis() as u64,
151                                "← gRPC response (error)"
152                            );
153                        } else {
154                            tracing::info!(
155                                method = %method,
156                                http_status = %status,
157                                grpc_status = %gs,
158                                elapsed_ms = elapsed.as_millis() as u64,
159                                "← gRPC response"
160                            );
161                        }
162                    } else {
163                        // gRPC status will come in trailers
164                        tracing::info!(
165                            method = %method,
166                            http_status = %status,
167                            elapsed_ms = elapsed.as_millis() as u64,
168                            "← gRPC response (status in trailers)"
169                        );
170                    }
171
172                    if config.log_headers {
173                        tracing::debug!(
174                            method = %method,
175                            headers = ?resp.headers(),
176                            "← gRPC response headers"
177                        );
178                    }
179
180                    // Wrap the response body for frame-level logging
181                    let (parts, body) = resp.into_parts();
182                    let debug_body = DebugBody::new(
183                        body,
184                        method,
185                        config.log_response_frames,
186                        config.max_body_bytes,
187                    );
188                    Ok(Response::from_parts(parts, debug_body))
189                }
190                Err(e) => {
191                    tracing::error!(
192                        method = %method,
193                        elapsed_ms = elapsed.as_millis() as u64,
194                        error = %e,
195                        "← gRPC call failed"
196                    );
197                    Err(e)
198                }
199            }
200        })
201    }
202}