relay_core_lib/proxy/
tap.rs1use std::pin::Pin;
2use std::task::{Context, Poll};
3use tokio::sync::mpsc::Sender;
4use hyper::body::{Body, Frame, SizeHint, Bytes};
5use relay_core_api::flow::{FlowUpdate, BodyData, Direction};
6use crate::interceptor::{HttpBody, BoxError};
7use crate::proxy::body_codec::process_body;
8
9pub struct TapBody {
10 inner: HttpBody,
11 flow_id: String,
12 on_flow: Sender<FlowUpdate>,
13 direction: Direction,
14 buffer: Vec<u8>,
15 limit: usize,
16 headers: Vec<(String, String)>,
17}
18
19impl TapBody {
20 pub fn new(
21 inner: HttpBody,
22 flow_id: String,
23 on_flow: Sender<FlowUpdate>,
24 direction: Direction,
25 limit: usize,
26 headers: Vec<(String, String)>,
27 ) -> Self {
28 Self {
29 inner,
30 flow_id,
31 on_flow,
32 direction,
33 buffer: Vec::new(),
34 limit,
35 headers,
36 }
37 }
38}
39
40impl Body for TapBody {
41 type Data = Bytes;
42 type Error = BoxError;
43
44 fn poll_frame(
45 mut self: Pin<&mut Self>,
46 cx: &mut Context<'_>,
47 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
48 match Pin::new(&mut self.inner).poll_frame(cx) {
49 Poll::Ready(Some(Ok(frame))) => {
50 if let Some(data) = frame.data_ref()
51 && self.buffer.len() < self.limit {
52 let len = std::cmp::min(data.len(), self.limit - self.buffer.len());
53 self.buffer.extend_from_slice(&data[..len]);
54 }
55 Poll::Ready(Some(Ok(frame)))
56 },
57 Poll::Ready(None) => {
58 let (encoding, content) = process_body(&self.buffer, &self.headers);
59 let body_data = BodyData {
60 encoding,
61 content,
62 size: self.buffer.len() as u64,
63 };
64
65 let _ = self.on_flow.try_send(FlowUpdate::HttpBody {
66 flow_id: self.flow_id.clone(),
67 direction: self.direction.clone(),
68 body: body_data,
69 });
70
71 Poll::Ready(None)
72 },
73 other => other,
74 }
75 }
76
77 fn is_end_stream(&self) -> bool {
78 self.inner.is_end_stream()
79 }
80
81 fn size_hint(&self) -> SizeHint {
82 self.inner.size_hint()
83 }
84}