relay-core-lib 0.3.0

[Internal] Transport and interception engine for relay-core-runtime. Use `relay-core-runtime` instead.
Documentation
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc::Sender;
use hyper::body::{Body, Frame, SizeHint, Bytes};
use relay_core_api::flow::{FlowUpdate, BodyData, Direction};
use crate::interceptor::{HttpBody, BoxError};
use crate::proxy::body_codec::process_body;

pub struct TapBody {
    inner: HttpBody,
    flow_id: String,
    on_flow: Sender<FlowUpdate>,
    direction: Direction,
    buffer: Vec<u8>,
    limit: usize,
    headers: Vec<(String, String)>,
}

impl TapBody {
    pub fn new(
        inner: HttpBody,
        flow_id: String,
        on_flow: Sender<FlowUpdate>,
        direction: Direction,
        limit: usize,
        headers: Vec<(String, String)>,
    ) -> Self {
        Self {
            inner,
            flow_id,
            on_flow,
            direction,
            buffer: Vec::new(),
            limit,
            headers,
        }
    }
}

impl Body for TapBody {
    type Data = Bytes;
    type Error = BoxError;

    fn poll_frame(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
        match Pin::new(&mut self.inner).poll_frame(cx) {
            Poll::Ready(Some(Ok(frame))) => {
                if let Some(data) = frame.data_ref()
                    && self.buffer.len() < self.limit {
                        let len = std::cmp::min(data.len(), self.limit - self.buffer.len());
                        self.buffer.extend_from_slice(&data[..len]);
                    }
                Poll::Ready(Some(Ok(frame)))
            },
            Poll::Ready(None) => {
                let (encoding, content) = process_body(&self.buffer, &self.headers);
                let body_data = BodyData {
                    encoding,
                    content,
                    size: self.buffer.len() as u64,
                };
                
                let _ = self.on_flow.try_send(FlowUpdate::HttpBody {
                    flow_id: self.flow_id.clone(),
                    direction: self.direction.clone(),
                    body: body_data,
                });
                
                Poll::Ready(None)
            },
            other => other,
        }
    }

    fn is_end_stream(&self) -> bool {
        self.inner.is_end_stream()
    }

    fn size_hint(&self) -> SizeHint {
        self.inner.size_hint()
    }
}