soth-mitm 0.2.0

Rust intercepting proxy crate with deterministic handler/event contracts for SOTH.
Documentation
use super::http2_relay_support::{h2_error_to_io, H2_FORWARD_CHUNK_LIMIT};
use super::io_timeouts::with_h2_body_idle_timeout;
use super::runtime_governor;
use std::io;
use std::sync::Arc;

pub(crate) async fn send_h2_data_with_backpressure(
    sink: &mut h2::SendStream<bytes::Bytes>,
    runtime_governor: &Arc<runtime_governor::RuntimeGovernor>,
    mut data: bytes::Bytes,
    end_stream: bool,
) -> io::Result<()> {
    if data.is_empty() {
        sink.send_data(data, end_stream)
            .map_err(|error| h2_error_to_io("sending HTTP/2 data frame failed", error))?;
        return Ok(());
    }

    while !data.is_empty() {
        // Reserve capacity in bounded slices so large buffered bodies cannot deadlock
        // waiting for a full remaining-window grant before sending the first frame.
        let desired_capacity = data.len().min(H2_FORWARD_CHUNK_LIMIT);
        let available_capacity = wait_for_h2_capacity(sink, desired_capacity).await?;
        let send_len = available_capacity
            .min(data.len())
            .min(H2_FORWARD_CHUNK_LIMIT);
        if send_len == 0 {
            continue;
        }
        let chunk = data.split_to(send_len);
        let _in_flight_lease =
            runtime_governor.reserve_in_flight_or_error(send_len, "http2_data_frame_write")?;
        let is_last = data.is_empty();
        sink.send_data(chunk, end_stream && is_last)
            .map_err(|error| h2_error_to_io("sending HTTP/2 data frame failed", error))?;
    }

    Ok(())
}

async fn wait_for_h2_capacity(
    sink: &mut h2::SendStream<bytes::Bytes>,
    desired: usize,
) -> io::Result<usize> {
    with_h2_body_idle_timeout("http2_send_capacity_wait", async {
        runtime_governor::mark_backpressure_activation_global();
        sink.reserve_capacity(desired);
        loop {
            match std::future::poll_fn(|cx| sink.poll_capacity(cx)).await {
                Some(Ok(capacity)) if capacity > 0 => return Ok(capacity),
                Some(Ok(_)) => {
                    runtime_governor::mark_backpressure_activation_global();
                    continue;
                }
                Some(Err(error)) => {
                    return Err(h2_error_to_io("polling HTTP/2 send capacity failed", error));
                }
                None => {
                    return Err(io::Error::new(
                        io::ErrorKind::BrokenPipe,
                        "HTTP/2 send stream closed before capacity became available",
                    ));
                }
            }
        }
    })
    .await
}