1use bytes::Bytes;
7use http_body::Frame;
8use pin_project_lite::pin_project;
9use std::{
10 pin::Pin,
11 task::{Context, Poll},
12};
13use tracing;
14
15use crate::inspect;
16
17pin_project! {
18 pub struct DebugBody<B> {
20 #[pin]
21 inner: B,
22 method: String,
23 log_body: bool,
24 max_capture_bytes: usize,
25 captured: Vec<u8>,
26 }
27}
28
29impl<B> DebugBody<B> {
30 pub fn new(inner: B, method: String, log_body: bool, max_capture_bytes: usize) -> Self {
32 Self {
33 inner,
34 method,
35 log_body,
36 max_capture_bytes,
37 captured: Vec::new(),
38 }
39 }
40}
41
42impl<B> http_body::Body for DebugBody<B>
43where
44 B: http_body::Body<Data = Bytes>,
45 B::Error: std::fmt::Display,
46{
47 type Data = Bytes;
48 type Error = B::Error;
49
50 fn poll_frame(
51 self: Pin<&mut Self>,
52 cx: &mut Context<'_>,
53 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
54 let this = self.project();
55
56 match this.inner.poll_frame(cx) {
57 Poll::Ready(Some(Ok(frame))) => {
58 if *this.log_body {
59 if let Some(data) = frame.data_ref() {
60 let bytes = data.as_ref();
61 let remaining = this.max_capture_bytes.saturating_sub(this.captured.len());
63 let to_capture = bytes.len().min(remaining);
64 this.captured.extend_from_slice(&bytes[..to_capture]);
65
66 let formatted = inspect::format_grpc_message(bytes);
67 tracing::debug!(
68 method = %this.method,
69 frame_size = bytes.len(),
70 "gRPC response frame:\n{}",
71 formatted
72 );
73 }
74
75 if let Some(trailers) = frame.trailers_ref() {
76 let grpc_status = trailers
77 .get("grpc-status")
78 .and_then(|v| v.to_str().ok())
79 .unwrap_or("unknown");
80 let grpc_message = trailers
81 .get("grpc-message")
82 .and_then(|v| v.to_str().ok())
83 .unwrap_or("");
84
85 if grpc_status != "0" {
86 tracing::warn!(
87 method = %this.method,
88 grpc_status = grpc_status,
89 grpc_message = grpc_message,
90 "gRPC response trailers indicate error"
91 );
92 } else {
93 tracing::debug!(
94 method = %this.method,
95 grpc_status = grpc_status,
96 "gRPC response trailers OK"
97 );
98 }
99 }
100 }
101 Poll::Ready(Some(Ok(frame)))
102 }
103 Poll::Ready(Some(Err(e))) => {
104 tracing::error!(
105 method = %this.method,
106 error = %e,
107 "gRPC response body error"
108 );
109 Poll::Ready(Some(Err(e)))
110 }
111 Poll::Ready(None) => {
112 if *this.log_body && !this.captured.is_empty() {
113 tracing::trace!(
114 method = %this.method,
115 total_response_bytes = this.captured.len(),
116 "gRPC response stream completed"
117 );
118 }
119 Poll::Ready(None)
120 }
121 Poll::Pending => Poll::Pending,
122 }
123 }
124
125 fn is_end_stream(&self) -> bool {
126 self.inner.is_end_stream()
127 }
128
129 fn size_hint(&self) -> http_body::SizeHint {
130 self.inner.size_hint()
131 }
132}