soth-mitm 0.2.2

Rust intercepting proxy crate with deterministic handler/event contracts for SOTH.
Documentation
use super::close_codes::CloseReasonCode;
use super::event_emitters::emit_stream_closed;
use super::flow_forward_proxy_http1_helpers::{
    is_self_listener_target, resolve_forward_http_route,
};
use super::flow_hooks::FlowHooks;
use super::flow_intercept_http1::relay_http1_mitm_loop;
use super::flow_policy_snapshot::resolve_flow_policy_snapshot;
use super::http_head_parser::parse_http_request_head;
use super::io_timeouts::{
    copy_bidirectional_with_websocket_idle_timeout, is_idle_watchdog_timeout,
    is_stream_stage_timeout, shutdown_with_idle_timeout, write_all_with_idle_timeout,
};
use super::route_planner_model::{FlowRoutePlanner, RouteConnectIntent};
use super::route_planner_transport::connect_via_route;
use super::runtime_governor;
use super::BufferedConn;
use crate::engine::MitmEngine;
use crate::observe::{EventConsumer, FlowContext};
use crate::policy::{FlowAction, PolicyEngine};
use crate::protocol::ApplicationProtocol;
use crate::types::ProcessInfo;
use std::io;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;

#[allow(clippy::too_many_arguments)]
pub(crate) async fn handle_forward_http1_proxy_request<P, S, D>(
    engine: Arc<MitmEngine<P, S>>,
    runtime_governor: Arc<runtime_governor::RuntimeGovernor>,
    flow_hooks: Arc<dyn FlowHooks>,
    downstream: D,
    client_addr: String,
    flow_id: crate::types::FlowId,
    process_info: Option<ProcessInfo>,
    initial_head: Vec<u8>,
    max_http_head_bytes: usize,
    listener_addr: Option<std::net::SocketAddr>,
    flow_guard: &mut Option<crate::server::runtime_governor::FlowRuntimeGuard>,
) -> io::Result<()>
where
    P: PolicyEngine + Send + Sync + 'static,
    S: EventConsumer + Send + Sync + 'static,
    D: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
    let mut downstream = downstream;
    let request = match parse_http_request_head(&initial_head) {
        Ok(parsed) => parsed,
        Err(error) => {
            let context = FlowContext {
                flow_id,
                client_addr,
                server_host: "<unknown>".to_string(),
                server_port: 0,
                protocol: ApplicationProtocol::Http1,
            };
            emit_stream_closed(
                &engine,
                context,
                CloseReasonCode::MitmHttpError,
                Some(format!("invalid forward-proxy request: {error}")),
                None,
                None,
            );
            write_forward_proxy_error_response(
                &mut downstream,
                "400 Bad Request",
                "invalid HTTP proxy request",
            )
            .await?;
            return Ok(());
        }
    };

    let target = match resolve_forward_http_route(&request) {
        Ok(value) => value,
        Err(error) => {
            let context = FlowContext {
                flow_id,
                client_addr,
                server_host: "<unknown>".to_string(),
                server_port: 0,
                protocol: ApplicationProtocol::Http1,
            };
            emit_stream_closed(
                &engine,
                context,
                CloseReasonCode::MitmHttpError,
                Some(format!("invalid forward-proxy route: {error}")),
                None,
                None,
            );
            write_forward_proxy_error_response(
                &mut downstream,
                "400 Bad Request",
                "invalid HTTP proxy target",
            )
            .await?;
            return Ok(());
        }
    };

    let (listen_addr, listen_port) = listener_addr
        .map(|addr| (addr.ip().to_string(), addr.port()))
        .unwrap_or_else(|| (engine.config.listen_addr.clone(), engine.config.listen_port));

    if is_self_listener_target(&target.host, target.port, &listen_addr, listen_port) {
        let context = FlowContext {
            flow_id,
            client_addr,
            server_host: target.host.clone(),
            server_port: target.port,
            protocol: ApplicationProtocol::Http1,
        };
        emit_stream_closed(
            &engine,
            context,
            CloseReasonCode::RoutePlannerFailed,
            Some(format!(
                "forward-proxy self-target loop detected: {}:{} matches listener {}:{}",
                target.host, target.port, listen_addr, listen_port
            )),
            None,
            None,
        );
        write_forward_proxy_error_response(
            &mut downstream,
            "508 Loop Detected",
            "forward proxy self-target not allowed",
        )
        .await?;
        return Ok(());
    }

    let mut route_planner = FlowRoutePlanner::default();
    let route = match route_planner.bind_once(&engine.config, target) {
        Ok(value) => value,
        Err(error) => {
            let context = FlowContext {
                flow_id,
                client_addr,
                server_host: "<unknown>".to_string(),
                server_port: 0,
                protocol: ApplicationProtocol::Http1,
            };
            emit_stream_closed(
                &engine,
                context,
                CloseReasonCode::RoutePlannerFailed,
                Some(format!("forward-proxy route planner failed: {error}")),
                None,
                None,
            );
            write_forward_proxy_error_response(
                &mut downstream,
                "502 Bad Gateway",
                "route planner failed for forward proxy request",
            )
            .await?;
            return Ok(());
        }
    };

    let policy_snapshot = resolve_flow_policy_snapshot(
        &engine,
        flow_id,
        client_addr.clone(),
        route.target_host.clone(),
        route.target_port,
        route.policy_path.clone(),
        process_info,
    );
    let context = FlowContext {
        flow_id: policy_snapshot.flow_id,
        client_addr,
        server_host: route.target_host.clone(),
        server_port: route.target_port,
        protocol: ApplicationProtocol::Http1,
    };

    if policy_snapshot.action == FlowAction::Block {
        write_forward_proxy_error_response(
            &mut downstream,
            "403 Forbidden",
            &policy_snapshot.reason,
        )
        .await?;
        emit_stream_closed(
            &engine,
            context,
            CloseReasonCode::Blocked,
            Some(policy_snapshot.reason),
            None,
            None,
        );
        return Ok(());
    }

    let upstream_tcp = match connect_via_route(&route, RouteConnectIntent::ForwardHttpRequest).await
    {
        Ok(stream) => stream,
        Err(error) => {
            write_forward_proxy_error_response(
                &mut downstream,
                "502 Bad Gateway",
                &format!(
                    "upstream_connect_failed[{}]: {error}",
                    route.route_mode_label()
                ),
            )
            .await?;
            emit_stream_closed(
                &engine,
                context,
                CloseReasonCode::UpstreamConnectFailed,
                Some(error.to_string()),
                None,
                None,
            );
            return Ok(());
        }
    };

    if policy_snapshot.action == FlowAction::Tunnel {
        // No permit needed for tunnel — never acquired.
        return tunnel_http1_forward_stream(
            engine,
            context,
            downstream,
            upstream_tcp,
            initial_head,
        )
        .await;
    }

    let mut downstream_conn = BufferedConn::new(downstream);
    downstream_conn.read_buf = initial_head;
    let upstream_conn = BufferedConn::new(upstream_tcp);
    relay_http1_mitm_loop(
        engine,
        runtime_governor,
        flow_hooks,
        context,
        route.request_target_mode,
        downstream_conn,
        upstream_conn,
        max_http_head_bytes,
        policy_snapshot.override_state.strict_header_mode,
    )
    .await
}

async fn tunnel_http1_forward_stream<P, S, D>(
    engine: Arc<MitmEngine<P, S>>,
    context: FlowContext,
    mut downstream: D,
    mut upstream: TcpStream,
    initial_head: Vec<u8>,
) -> io::Result<()>
where
    P: PolicyEngine + Send + Sync + 'static,
    S: EventConsumer + Send + Sync + 'static,
    D: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
    let first_request = match parse_http_request_head(&initial_head) {
        Ok(value) => value,
        Err(error) => {
            emit_stream_closed(
                &engine,
                context,
                CloseReasonCode::MitmHttpError,
                Some(format!(
                    "invalid first HTTP request in tunnel mode: {error}"
                )),
                None,
                None,
            );
            return Ok(());
        }
    };
    if let Err(error) = write_all_with_idle_timeout(
        &mut upstream,
        &first_request.raw,
        "forward_tunnel_initial_request_write",
    )
    .await
    {
        emit_stream_closed(
            &engine,
            context,
            CloseReasonCode::RelayError,
            Some(format!("forward initial request failed: {error}")),
            None,
            None,
        );
        return Ok(());
    }

    match copy_bidirectional_with_websocket_idle_timeout(&mut downstream, &mut upstream).await {
        Ok((from_client, from_server)) => {
            // Forward-proxy tunnel: blind TCP passthrough, no body budget applies.
            emit_stream_closed(
                &engine,
                context,
                CloseReasonCode::RelayEof,
                None,
                Some(from_client + (initial_head.len() as u64)),
                Some(from_server),
            );
            Ok(())
        }
        Err(error) => {
            let reason = if is_idle_watchdog_timeout(&error) {
                CloseReasonCode::IdleWatchdogTimeout
            } else if is_stream_stage_timeout(&error) {
                CloseReasonCode::StreamStageTimeout
            } else {
                CloseReasonCode::RelayError
            };
            emit_stream_closed(
                &engine,
                context,
                reason,
                Some(error.to_string()),
                None,
                None,
            );
            if error.kind() == io::ErrorKind::InvalidData
                && error.to_string().contains("copy_bidirectional")
            {
                let _ = write_forward_proxy_error_response(
                    &mut downstream,
                    "400 Bad Request",
                    "HTTP proxy stream relay failed",
                )
                .await;
            }
            Ok(())
        }
    }
}

pub(crate) async fn write_forward_proxy_error_response<D>(
    downstream: &mut D,
    status: &str,
    body: &str,
) -> io::Result<()>
where
    D: AsyncWrite + Unpin,
{
    let response = format!(
        "HTTP/1.1 {status}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
        body.len()
    );
    write_all_with_idle_timeout(
        downstream,
        response.as_bytes(),
        "forward_proxy_error_response_write",
    )
    .await?;
    shutdown_with_idle_timeout(downstream, "forward_proxy_error_response_shutdown").await
}