soth-mitm 0.2.0

Rust intercepting proxy crate with deterministic handler/event contracts for SOTH.
Documentation
use super::flow_hook_http_helpers::strip_trailer_forbidden_and_transport_headers;
use super::flow_hooks::FlowHooks;
use super::http2_relay_support::h2_error_to_io;
use super::http2_stream_hook_dispatch::H2CapturedBody;
use super::http2_stream_relay_body::send_h2_data_with_backpressure;
use super::http2_stream_relay_http1_body::{
    read_http1_chunked_trailers_as_header_map, read_http1_response_chunk_allow_eof,
    read_http1_response_chunk_non_eof,
};
use super::http2_stream_response_relay::H2ResponseStreamHookDispatcher;
use super::http_body_relay::{parse_chunk_len, read_chunk_line, read_exact_from_source};
use super::io_timeouts::with_h2_body_idle_timeout;
use super::runtime_governor;
use super::{BufferedConn, HttpBodyMode, IO_CHUNK_SIZE};
use crate::config::H2ResponseOverflowMode;
use crate::observe::FlowContext;
use std::io;
use std::sync::Arc;
use tokio::io::AsyncRead;

pub(crate) struct Http1ToH2ResponseRelayOutcome {
    pub(crate) captured: H2CapturedBody,
    pub(crate) observed_trailers: Option<http::HeaderMap>,
}

struct Http1ResponseCaptureState {
    total_forwarded: u64,
    captured: Vec<u8>,
    body_truncated: bool,
}

impl Http1ResponseCaptureState {
    fn new() -> Self {
        Self {
            total_forwarded: 0,
            captured: Vec::new(),
            body_truncated: false,
        }
    }

    fn observe_chunk(&mut self, chunk: &[u8], max_handler_body: usize) -> bool {
        self.total_forwarded += chunk.len() as u64;
        if self.body_truncated {
            return false;
        }
        let remaining = max_handler_body.saturating_sub(self.captured.len());
        if remaining >= chunk.len() {
            self.captured.extend_from_slice(chunk);
            return false;
        }
        if remaining > 0 {
            self.captured.extend_from_slice(&chunk[..remaining]);
        }
        self.body_truncated = true;
        true
    }

    fn into_captured(self, trailers: Option<http::HeaderMap>) -> H2CapturedBody {
        H2CapturedBody {
            bytes: bytes::Bytes::from(self.captured),
            bytes_forwarded: self.total_forwarded,
            trailers,
            body_truncated: self.body_truncated,
        }
    }
}

#[allow(clippy::too_many_arguments)]
pub(crate) async fn relay_http1_response_body_with_incremental_forwarding<U>(
    source: &mut BufferedConn<U>,
    mode: HttpBodyMode,
    max_http_head_bytes: usize,
    runtime_governor: &Arc<runtime_governor::RuntimeGovernor>,
    downstream_response_stream: &mut h2::SendStream<bytes::Bytes>,
    flow_hooks: &Arc<dyn FlowHooks>,
    stream_context: &FlowContext,
    stream_dispatcher: &mut Option<H2ResponseStreamHookDispatcher>,
    max_handler_body: usize,
    h2_response_overflow_strict: bool,
) -> io::Result<Http1ToH2ResponseRelayOutcome>
where
    U: AsyncRead + Unpin,
{
    let overflow_mode = if h2_response_overflow_strict {
        H2ResponseOverflowMode::StrictFail
    } else {
        H2ResponseOverflowMode::TruncateContinue
    };
    let mut capture_state = Http1ResponseCaptureState::new();
    let mut trailers = match mode {
        HttpBodyMode::None => None,
        HttpBodyMode::ContentLength(length) => {
            relay_http1_content_length_body(
                source,
                length,
                runtime_governor,
                downstream_response_stream,
                flow_hooks,
                stream_context,
                stream_dispatcher,
                &mut capture_state,
                max_handler_body,
                overflow_mode,
            )
            .await?;
            None
        }
        HttpBodyMode::Chunked => {
            relay_http1_chunked_body(
                source,
                max_http_head_bytes,
                runtime_governor,
                downstream_response_stream,
                flow_hooks,
                stream_context,
                stream_dispatcher,
                &mut capture_state,
                max_handler_body,
                overflow_mode,
            )
            .await?
        }
        HttpBodyMode::CloseDelimited => {
            relay_http1_close_delimited_body(
                source,
                runtime_governor,
                downstream_response_stream,
                flow_hooks,
                stream_context,
                stream_dispatcher,
                &mut capture_state,
                max_handler_body,
                overflow_mode,
            )
            .await?;
            None
        }
    };

    if let Some(candidate) = trailers.as_mut() {
        strip_trailer_forbidden_and_transport_headers(candidate);
        if candidate.is_empty() {
            trailers = None;
        }
    }

    let observed_trailers = if let Some(trailers_to_send) = trailers.clone() {
        downstream_response_stream
            .send_trailers(trailers_to_send.clone())
            .map_err(|error| h2_error_to_io("sending HTTP/2 trailers failed", error))?;
        Some(trailers_to_send)
    } else {
        send_h2_data_with_backpressure(
            downstream_response_stream,
            runtime_governor,
            bytes::Bytes::new(),
            true,
        )
        .await?;
        None
    };

    if let Some(dispatcher) = stream_dispatcher.as_mut() {
        dispatcher.finish(flow_hooks, stream_context).await;
    }

    Ok(Http1ToH2ResponseRelayOutcome {
        captured: capture_state.into_captured(trailers),
        observed_trailers,
    })
}

#[allow(clippy::too_many_arguments)]
async fn relay_http1_content_length_body<U>(
    source: &mut BufferedConn<U>,
    length: u64,
    runtime_governor: &Arc<runtime_governor::RuntimeGovernor>,
    downstream_response_stream: &mut h2::SendStream<bytes::Bytes>,
    flow_hooks: &Arc<dyn FlowHooks>,
    stream_context: &FlowContext,
    stream_dispatcher: &mut Option<H2ResponseStreamHookDispatcher>,
    capture_state: &mut Http1ResponseCaptureState,
    max_handler_body: usize,
    overflow_mode: H2ResponseOverflowMode,
) -> io::Result<()>
where
    U: AsyncRead + Unpin,
{
    let mut remaining = length;
    while remaining > 0 {
        let read_len = remaining.min(IO_CHUNK_SIZE as u64) as usize;
        let chunk = read_http1_response_chunk_non_eof(
            source,
            read_len,
            "http2_to_http1_response_body_next_chunk",
        )
        .await?;
        remaining = remaining.checked_sub(chunk.len() as u64).ok_or_else(|| {
            io::Error::new(
                io::ErrorKind::InvalidData,
                "upstream sent more bytes than declared content-length",
            )
        })?;
        forward_http1_response_chunk(
            chunk,
            runtime_governor,
            downstream_response_stream,
            flow_hooks,
            stream_context,
            stream_dispatcher,
            capture_state,
            max_handler_body,
            overflow_mode,
        )
        .await?;
    }
    Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn relay_http1_chunked_body<U>(
    source: &mut BufferedConn<U>,
    max_http_head_bytes: usize,
    runtime_governor: &Arc<runtime_governor::RuntimeGovernor>,
    downstream_response_stream: &mut h2::SendStream<bytes::Bytes>,
    flow_hooks: &Arc<dyn FlowHooks>,
    stream_context: &FlowContext,
    stream_dispatcher: &mut Option<H2ResponseStreamHookDispatcher>,
    capture_state: &mut Http1ResponseCaptureState,
    max_handler_body: usize,
    overflow_mode: H2ResponseOverflowMode,
) -> io::Result<Option<http::HeaderMap>>
where
    U: AsyncRead + Unpin,
{
    loop {
        let line = with_h2_body_idle_timeout("http2_to_http1_response_body_chunk_line", async {
            read_chunk_line(source, runtime_governor).await
        })
        .await?;
        let chunk_len = parse_chunk_len(&line)?;
        if chunk_len == 0 {
            return read_http1_chunked_trailers_as_header_map(
                source,
                max_http_head_bytes,
                runtime_governor,
            )
            .await;
        }

        let mut remaining = chunk_len;
        while remaining > 0 {
            let read_len = remaining.min(IO_CHUNK_SIZE as u64) as usize;
            let chunk = read_http1_response_chunk_non_eof(
                source,
                read_len,
                "http2_to_http1_response_body_next_chunk",
            )
            .await
            .map_err(|error| {
                if error.kind() == io::ErrorKind::UnexpectedEof {
                    io::Error::new(
                        io::ErrorKind::UnexpectedEof,
                        "connection closed before chunked response body completed",
                    )
                } else {
                    error
                }
            })?;
            remaining = remaining.checked_sub(chunk.len() as u64).ok_or_else(|| {
                io::Error::new(
                    io::ErrorKind::InvalidData,
                    "upstream chunk exceeded declared chunk length",
                )
            })?;
            forward_http1_response_chunk(
                chunk,
                runtime_governor,
                downstream_response_stream,
                flow_hooks,
                stream_context,
                stream_dispatcher,
                capture_state,
                max_handler_body,
                overflow_mode,
            )
            .await?;
        }

        let terminator =
            with_h2_body_idle_timeout("http2_to_http1_response_body_chunk_terminator", async {
                read_exact_from_source(source, 2, runtime_governor).await
            })
            .await?;
        if terminator.as_slice() != b"\r\n" {
            return Err(io::Error::new(
                io::ErrorKind::InvalidData,
                "invalid chunk terminator",
            ));
        }
    }
}

#[allow(clippy::too_many_arguments)]
async fn relay_http1_close_delimited_body<U>(
    source: &mut BufferedConn<U>,
    runtime_governor: &Arc<runtime_governor::RuntimeGovernor>,
    downstream_response_stream: &mut h2::SendStream<bytes::Bytes>,
    flow_hooks: &Arc<dyn FlowHooks>,
    stream_context: &FlowContext,
    stream_dispatcher: &mut Option<H2ResponseStreamHookDispatcher>,
    capture_state: &mut Http1ResponseCaptureState,
    max_handler_body: usize,
    overflow_mode: H2ResponseOverflowMode,
) -> io::Result<()>
where
    U: AsyncRead + Unpin,
{
    loop {
        let Some(chunk) = read_http1_response_chunk_allow_eof(
            source,
            IO_CHUNK_SIZE,
            "http2_to_http1_close_delimited_read",
        )
        .await?
        else {
            break;
        };
        forward_http1_response_chunk(
            chunk,
            runtime_governor,
            downstream_response_stream,
            flow_hooks,
            stream_context,
            stream_dispatcher,
            capture_state,
            max_handler_body,
            overflow_mode,
        )
        .await?;
    }
    Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn forward_http1_response_chunk(
    chunk: Vec<u8>,
    runtime_governor: &Arc<runtime_governor::RuntimeGovernor>,
    downstream_response_stream: &mut h2::SendStream<bytes::Bytes>,
    flow_hooks: &Arc<dyn FlowHooks>,
    stream_context: &FlowContext,
    stream_dispatcher: &mut Option<H2ResponseStreamHookDispatcher>,
    capture_state: &mut Http1ResponseCaptureState,
    max_handler_body: usize,
    overflow_mode: H2ResponseOverflowMode,
) -> io::Result<()> {
    if chunk.is_empty() {
        return Ok(());
    }
    let truncated_now = capture_state.observe_chunk(&chunk, max_handler_body);
    if truncated_now && matches!(overflow_mode, H2ResponseOverflowMode::StrictFail) {
        return Err(io::Error::new(
            io::ErrorKind::InvalidData,
            "upstream response body exceeded flow body budget (strict overflow mode)",
        ));
    }
    let chunk = bytes::Bytes::from(chunk);
    let hook_chunk = stream_dispatcher.as_ref().map(|_| chunk.clone());
    send_h2_data_with_backpressure(downstream_response_stream, runtime_governor, chunk, false)
        .await?;
    if let (Some(dispatcher), Some(chunk)) = (stream_dispatcher.as_mut(), hook_chunk.as_ref()) {
        dispatcher
            .on_chunk(flow_hooks, stream_context, chunk.as_ref())
            .await;
    }
    Ok(())
}