Skip to main content

relay_core_lib/proxy/
tap.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3use tokio::sync::mpsc::Sender;
4use hyper::body::{Body, Frame, SizeHint, Bytes};
5use relay_core_api::flow::{FlowUpdate, BodyData, Direction};
6use crate::interceptor::{HttpBody, BoxError};
7use crate::proxy::body_codec::process_body;
8
9pub struct TapBody {
10    inner: HttpBody,
11    flow_id: String,
12    on_flow: Sender<FlowUpdate>,
13    direction: Direction,
14    buffer: Vec<u8>,
15    limit: usize,
16    headers: Vec<(String, String)>,
17}
18
19impl TapBody {
20    pub fn new(
21        inner: HttpBody,
22        flow_id: String,
23        on_flow: Sender<FlowUpdate>,
24        direction: Direction,
25        limit: usize,
26        headers: Vec<(String, String)>,
27    ) -> Self {
28        Self {
29            inner,
30            flow_id,
31            on_flow,
32            direction,
33            buffer: Vec::new(),
34            limit,
35            headers,
36        }
37    }
38}
39
40impl Body for TapBody {
41    type Data = Bytes;
42    type Error = BoxError;
43
44    fn poll_frame(
45        mut self: Pin<&mut Self>,
46        cx: &mut Context<'_>,
47    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
48        match Pin::new(&mut self.inner).poll_frame(cx) {
49            Poll::Ready(Some(Ok(frame))) => {
50                if let Some(data) = frame.data_ref() {
51                    if self.buffer.len() < self.limit {
52                        let len = std::cmp::min(data.len(), self.limit - self.buffer.len());
53                        self.buffer.extend_from_slice(&data[..len]);
54                    }
55                }
56                Poll::Ready(Some(Ok(frame)))
57            },
58            Poll::Ready(None) => {
59                // Stream ended, send update
60                let (encoding, content) = process_body(&self.buffer, &self.headers);
61                let body_data = BodyData {
62                    encoding,
63                    content,
64                    size: self.buffer.len() as u64, // Note: This is captured size, not total size if truncated
65                };
66                
67                let _ = self.on_flow.try_send(FlowUpdate::HttpBody {
68                    flow_id: self.flow_id.clone(),
69                    direction: self.direction.clone(),
70                    body: body_data,
71                });
72                
73                Poll::Ready(None)
74            },
75            other => other,
76        }
77    }
78
79    fn is_end_stream(&self) -> bool {
80        self.inner.is_end_stream()
81    }
82
83    fn size_hint(&self) -> SizeHint {
84        self.inner.size_hint()
85    }
86}