Skip to main content

relay_core_lib/proxy/
tap.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3use tokio::sync::mpsc::Sender;
4use hyper::body::{Body, Frame, SizeHint, Bytes};
5use relay_core_api::flow::{FlowUpdate, BodyData, Direction};
6use crate::interceptor::{HttpBody, BoxError};
7use crate::proxy::body_codec::process_body;
8
9pub struct TapBody {
10    inner: HttpBody,
11    flow_id: String,
12    on_flow: Sender<FlowUpdate>,
13    direction: Direction,
14    buffer: Vec<u8>,
15    limit: usize,
16    headers: Vec<(String, String)>,
17}
18
19impl TapBody {
20    pub fn new(
21        inner: HttpBody,
22        flow_id: String,
23        on_flow: Sender<FlowUpdate>,
24        direction: Direction,
25        limit: usize,
26        headers: Vec<(String, String)>,
27    ) -> Self {
28        Self {
29            inner,
30            flow_id,
31            on_flow,
32            direction,
33            buffer: Vec::new(),
34            limit,
35            headers,
36        }
37    }
38}
39
40impl Body for TapBody {
41    type Data = Bytes;
42    type Error = BoxError;
43
44    fn poll_frame(
45        mut self: Pin<&mut Self>,
46        cx: &mut Context<'_>,
47    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
48        match Pin::new(&mut self.inner).poll_frame(cx) {
49            Poll::Ready(Some(Ok(frame))) => {
50                if let Some(data) = frame.data_ref()
51                    && self.buffer.len() < self.limit {
52                        let len = std::cmp::min(data.len(), self.limit - self.buffer.len());
53                        self.buffer.extend_from_slice(&data[..len]);
54                    }
55                Poll::Ready(Some(Ok(frame)))
56            },
57            Poll::Ready(None) => {
58                let (encoding, content) = process_body(&self.buffer, &self.headers);
59                let body_data = BodyData {
60                    encoding,
61                    content,
62                    size: self.buffer.len() as u64,
63                };
64                
65                let _ = self.on_flow.try_send(FlowUpdate::HttpBody {
66                    flow_id: self.flow_id.clone(),
67                    direction: self.direction.clone(),
68                    body: body_data,
69                });
70                
71                Poll::Ready(None)
72            },
73            other => other,
74        }
75    }
76
77    fn is_end_stream(&self) -> bool {
78        self.inner.is_end_stream()
79    }
80
81    fn size_hint(&self) -> SizeHint {
82        self.inner.size_hint()
83    }
84}