Skip to main content

relay_core_lib/proxy/
tap.rs

1use crate::interceptor::{BoxError, HttpBody};
2use crate::proxy::body_codec::process_body;
3use hyper::body::{Body, Bytes, Frame, SizeHint};
4use relay_core_api::flow::{BodyData, Direction, FlowUpdate};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio::sync::mpsc::Sender;
8
9pub struct TapBody {
10    inner: HttpBody,
11    flow_id: String,
12    on_flow: Sender<FlowUpdate>,
13    direction: Direction,
14    buffer: Vec<u8>,
15    limit: usize,
16    headers: Vec<(String, String)>,
17}
18
19impl TapBody {
20    pub fn new(
21        inner: HttpBody,
22        flow_id: String,
23        on_flow: Sender<FlowUpdate>,
24        direction: Direction,
25        limit: usize,
26        headers: Vec<(String, String)>,
27    ) -> Self {
28        Self {
29            inner,
30            flow_id,
31            on_flow,
32            direction,
33            buffer: Vec::new(),
34            limit,
35            headers,
36        }
37    }
38}
39
40impl Body for TapBody {
41    type Data = Bytes;
42    type Error = BoxError;
43
44    fn poll_frame(
45        mut self: Pin<&mut Self>,
46        cx: &mut Context<'_>,
47    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
48        match Pin::new(&mut self.inner).poll_frame(cx) {
49            Poll::Ready(Some(Ok(frame))) => {
50                if let Some(data) = frame.data_ref()
51                    && self.buffer.len() < self.limit
52                {
53                    let len = std::cmp::min(data.len(), self.limit - self.buffer.len());
54                    self.buffer.extend_from_slice(&data[..len]);
55                }
56                Poll::Ready(Some(Ok(frame)))
57            }
58            Poll::Ready(None) => {
59                let (encoding, content) = process_body(&self.buffer, &self.headers);
60                let body_data = BodyData {
61                    encoding,
62                    content,
63                    size: self.buffer.len() as u64,
64                };
65
66                let _ = self.on_flow.try_send(FlowUpdate::HttpBody {
67                    flow_id: self.flow_id.clone(),
68                    direction: self.direction.clone(),
69                    body: body_data,
70                });
71
72                Poll::Ready(None)
73            }
74            other => other,
75        }
76    }
77
78    fn is_end_stream(&self) -> bool {
79        self.inner.is_end_stream()
80    }
81
82    fn size_hint(&self) -> SizeHint {
83        self.inner.size_hint()
84    }
85}