Skip to main content

tonic_debug/
body.rs

1//! Response body wrapper for capturing gRPC response data.
2//!
3//! Wraps the inner HTTP response body so that frames can be inspected
4//! and logged as they stream through the middleware.
5
6use bytes::Bytes;
7use http_body::Frame;
8use pin_project_lite::pin_project;
9use std::{
10    pin::Pin,
11    task::{Context, Poll},
12};
13use tracing;
14
15use crate::inspect;
16
17pin_project! {
18    /// A wrapper around an HTTP response body that logs gRPC frames as they are streamed.
19    pub struct DebugBody<B> {
20        #[pin]
21        inner: B,
22        method: String,
23        log_body: bool,
24        max_capture_bytes: usize,
25        captured: Vec<u8>,
26    }
27}
28
29impl<B> DebugBody<B> {
30    /// Create a new `DebugBody` wrapping the given body.
31    pub fn new(inner: B, method: String, log_body: bool, max_capture_bytes: usize) -> Self {
32        Self {
33            inner,
34            method,
35            log_body,
36            max_capture_bytes,
37            captured: Vec::new(),
38        }
39    }
40}
41
42impl<B> http_body::Body for DebugBody<B>
43where
44    B: http_body::Body<Data = Bytes>,
45    B::Error: std::fmt::Display,
46{
47    type Data = Bytes;
48    type Error = B::Error;
49
50    fn poll_frame(
51        self: Pin<&mut Self>,
52        cx: &mut Context<'_>,
53    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
54        let this = self.project();
55
56        match this.inner.poll_frame(cx) {
57            Poll::Ready(Some(Ok(frame))) => {
58                if *this.log_body {
59                    if let Some(data) = frame.data_ref() {
60                        let bytes = data.as_ref();
61                        // Accumulate bytes for inspection
62                        let remaining = this.max_capture_bytes.saturating_sub(this.captured.len());
63                        let to_capture = bytes.len().min(remaining);
64                        this.captured.extend_from_slice(&bytes[..to_capture]);
65
66                        let formatted = inspect::format_grpc_message(bytes);
67                        tracing::debug!(
68                            method = %this.method,
69                            frame_size = bytes.len(),
70                            "gRPC response frame:\n{}",
71                            formatted
72                        );
73                    }
74
75                    if let Some(trailers) = frame.trailers_ref() {
76                        let grpc_status = trailers
77                            .get("grpc-status")
78                            .and_then(|v| v.to_str().ok())
79                            .unwrap_or("unknown");
80                        let grpc_message = trailers
81                            .get("grpc-message")
82                            .and_then(|v| v.to_str().ok())
83                            .unwrap_or("");
84
85                        if grpc_status != "0" {
86                            tracing::warn!(
87                                method = %this.method,
88                                grpc_status = grpc_status,
89                                grpc_message = grpc_message,
90                                "gRPC response trailers indicate error"
91                            );
92                        } else {
93                            tracing::debug!(
94                                method = %this.method,
95                                grpc_status = grpc_status,
96                                "gRPC response trailers OK"
97                            );
98                        }
99                    }
100                }
101                Poll::Ready(Some(Ok(frame)))
102            }
103            Poll::Ready(Some(Err(e))) => {
104                tracing::error!(
105                    method = %this.method,
106                    error = %e,
107                    "gRPC response body error"
108                );
109                Poll::Ready(Some(Err(e)))
110            }
111            Poll::Ready(None) => {
112                if *this.log_body && !this.captured.is_empty() {
113                    tracing::trace!(
114                        method = %this.method,
115                        total_response_bytes = this.captured.len(),
116                        "gRPC response stream completed"
117                    );
118                }
119                Poll::Ready(None)
120            }
121            Poll::Pending => Poll::Pending,
122        }
123    }
124
125    fn is_end_stream(&self) -> bool {
126        self.inner.is_end_stream()
127    }
128
129    fn size_hint(&self) -> http_body::SizeHint {
130        self.inner.size_hint()
131    }
132}