soth-mitm 0.2.2

Rust intercepting proxy crate with deterministic handler/event contracts for SOTH.
Documentation
use super::http2_relay_support::GrpcRequestObservation;
use crate::engine::MitmEngine;
use crate::observe::{Event, EventConsumer, EventType, FlowContext};
use crate::policy::PolicyEngine;

pub(crate) fn emit_sse_event<P, S>(
    engine: &MitmEngine<P, S>,
    context: FlowContext,
    sequence_no: u64,
    event: &crate::protocol::SseEvent,
) where
    P: PolicyEngine + Send + Sync + 'static,
    S: EventConsumer + Send + Sync + 'static,
{
    let mut emitted = Event::new(EventType::SseEvent, context);
    emitted
        .attributes
        .insert("sequence_no".to_string(), sequence_no.to_string());
    emitted.attributes.insert(
        "data_line_count".to_string(),
        event.data_line_count.to_string(),
    );
    emitted
        .attributes
        .insert("data_len".to_string(), event.data.len().to_string());
    emitted
        .attributes
        .insert("data".to_string(), event.data.clone());
    if let Some(name) = &event.event {
        emitted.attributes.insert("event".to_string(), name.clone());
    }
    if let Some(id) = &event.id {
        emitted.attributes.insert("id".to_string(), id.clone());
    }
    if let Some(retry_ms) = event.retry_ms {
        emitted
            .attributes
            .insert("retry_ms".to_string(), retry_ms.to_string());
    }
    engine.emit_event(emitted);
}

pub(crate) fn emit_http3_passthrough_event<P, S>(
    engine: &MitmEngine<P, S>,
    context: FlowContext,
    requested_by: &str,
    policy_action: &str,
) where
    P: PolicyEngine + Send + Sync + 'static,
    S: EventConsumer + Send + Sync + 'static,
{
    let mut event = Event::new(EventType::Http3Passthrough, context);
    event
        .attributes
        .insert("passthrough_protocol".to_string(), "http3".to_string());
    event
        .attributes
        .insert("passthrough_mode".to_string(), "tunnel".to_string());
    event
        .attributes
        .insert("requested_by".to_string(), requested_by.to_string());
    event
        .attributes
        .insert("policy_action".to_string(), policy_action.to_string());
    engine.emit_event(event);
}

pub(crate) fn emit_grpc_request_headers_event<P, S>(
    engine: &MitmEngine<P, S>,
    context: FlowContext,
    observation: &GrpcRequestObservation,
    headers: &http::HeaderMap,
) where
    P: PolicyEngine + Send + Sync + 'static,
    S: EventConsumer + Send + Sync + 'static,
{
    let mut event = Event::new(EventType::GrpcRequestHeaders, context);
    insert_grpc_common_attrs(&mut event, observation);
    event
        .attributes
        .insert("grpc_event_sequence".to_string(), "1".to_string());
    event
        .attributes
        .insert("header_count".to_string(), headers.len().to_string());
    if let Some(te) = header_value(headers, "te") {
        event.attributes.insert("te".to_string(), te);
    }
    if let Some(user_agent) = header_value(headers, "user-agent") {
        event
            .attributes
            .insert("user_agent".to_string(), user_agent);
    }
    engine.emit_event(event);
}

pub(crate) fn emit_grpc_response_headers_event<P, S>(
    engine: &MitmEngine<P, S>,
    context: FlowContext,
    observation: &GrpcRequestObservation,
    response: &http::response::Parts,
) where
    P: PolicyEngine + Send + Sync + 'static,
    S: EventConsumer + Send + Sync + 'static,
{
    let mut event = Event::new(EventType::GrpcResponseHeaders, context);
    insert_grpc_common_attrs(&mut event, observation);
    event
        .attributes
        .insert("grpc_event_sequence".to_string(), "2".to_string());
    event.attributes.insert(
        "status_code".to_string(),
        response.status.as_u16().to_string(),
    );
    event.attributes.insert(
        "header_count".to_string(),
        response.headers.len().to_string(),
    );
    if let Some(content_type) = header_value(&response.headers, "content-type") {
        event
            .attributes
            .insert("grpc_response_content_type".to_string(), content_type);
    }
    engine.emit_event(event);
}

pub(crate) fn emit_grpc_response_trailers_event<P, S>(
    engine: &MitmEngine<P, S>,
    context: FlowContext,
    observation: &GrpcRequestObservation,
    trailers: &http::HeaderMap,
) where
    P: PolicyEngine + Send + Sync + 'static,
    S: EventConsumer + Send + Sync + 'static,
{
    let mut event = Event::new(EventType::GrpcResponseTrailers, context);
    insert_grpc_common_attrs(&mut event, observation);
    event
        .attributes
        .insert("grpc_event_sequence".to_string(), "3".to_string());
    event
        .attributes
        .insert("trailer_count".to_string(), trailers.len().to_string());
    if let Some(grpc_status) = header_value(trailers, "grpc-status") {
        event
            .attributes
            .insert("grpc_status".to_string(), grpc_status);
    }
    if let Some(grpc_message) = header_value(trailers, "grpc-message") {
        event
            .attributes
            .insert("grpc_message".to_string(), grpc_message);
    }
    engine.emit_event(event);
}

fn insert_grpc_common_attrs(event: &mut Event, observation: &GrpcRequestObservation) {
    event
        .attributes
        .insert("grpc_path".to_string(), observation.path.clone());
    event.attributes.insert(
        "grpc_detection_mode".to_string(),
        observation.detection_mode.to_string(),
    );
    if let Some(service) = &observation.service {
        event
            .attributes
            .insert("grpc_service".to_string(), service.clone());
    }
    if let Some(method) = &observation.method {
        event
            .attributes
            .insert("grpc_method".to_string(), method.clone());
    }
    if let Some(content_type) = &observation.content_type {
        event.attributes.insert(
            "grpc_request_content_type".to_string(),
            content_type.clone(),
        );
    }
}

fn header_value(headers: &http::HeaderMap, name: &str) -> Option<String> {
    headers
        .get(name)
        .and_then(|value| value.to_str().ok())
        .map(ToOwned::to_owned)
}