soth-mitm 0.3.0

Rust intercepting proxy crate with deterministic handler/event contracts for SOTH.
Documentation
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::OnceLock;

use crate::observe::{EventConsumer, EventEnvelope, EventType};

/// Point-in-time snapshot of proxy operational metrics.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ProxyMetrics {
    pub active_connections: u64,
    pub total_connections: u64,
    pub handler_panic_count: u64,
    pub handler_timeout_count: u64,
    pub upstream_connect_error_count: u64,
    pub upstream_timeout_count: u64,
    pub process_attribution_failure_count: u64,
    pub process_attribution_timeout_count: u64,
    pub process_cache_connection_hit_count: u64,
    pub process_cache_identity_hit_count: u64,
    pub process_cache_miss_count: u64,
    pub process_cache_eviction_count: u64,
    pub process_pid_reuse_detected_count: u64,
    pub dropped_dispatch_work_count: u64,
    pub stale_flow_reap_count: u64,
    pub closed_flow_id_eviction_count: u64,
    pub missing_connection_meta_count: u64,
}

#[derive(Debug, Default)]
pub(crate) struct ProxyMetricsStore {
    active_connections: AtomicU64,
    total_connections: AtomicU64,
    handler_panic_count: AtomicU64,
    handler_timeout_count: AtomicU64,
    upstream_connect_error_count: AtomicU64,
    upstream_timeout_count: AtomicU64,
    process_attribution_failure_count: AtomicU64,
    process_attribution_timeout_count: AtomicU64,
    process_cache_connection_hit_count: AtomicU64,
    process_cache_identity_hit_count: AtomicU64,
    process_cache_miss_count: AtomicU64,
    process_cache_eviction_count: AtomicU64,
    process_pid_reuse_detected_count: AtomicU64,
    dropped_dispatch_work_count: AtomicU64,
    stale_flow_reap_count: AtomicU64,
    closed_flow_id_eviction_count: AtomicU64,
    missing_connection_meta_count: AtomicU64,
}

impl ProxyMetricsStore {
    pub(crate) fn snapshot(&self) -> ProxyMetrics {
        ProxyMetrics {
            active_connections: self.active_connections.load(Ordering::Relaxed),
            total_connections: self.total_connections.load(Ordering::Relaxed),
            handler_panic_count: self.handler_panic_count.load(Ordering::Relaxed),
            handler_timeout_count: self.handler_timeout_count.load(Ordering::Relaxed),
            upstream_connect_error_count: self.upstream_connect_error_count.load(Ordering::Relaxed),
            upstream_timeout_count: self.upstream_timeout_count.load(Ordering::Relaxed),
            process_attribution_failure_count: self
                .process_attribution_failure_count
                .load(Ordering::Relaxed),
            process_attribution_timeout_count: self
                .process_attribution_timeout_count
                .load(Ordering::Relaxed),
            process_cache_connection_hit_count: self
                .process_cache_connection_hit_count
                .load(Ordering::Relaxed),
            process_cache_identity_hit_count: self
                .process_cache_identity_hit_count
                .load(Ordering::Relaxed),
            process_cache_miss_count: self.process_cache_miss_count.load(Ordering::Relaxed),
            process_cache_eviction_count: self.process_cache_eviction_count.load(Ordering::Relaxed),
            process_pid_reuse_detected_count: self
                .process_pid_reuse_detected_count
                .load(Ordering::Relaxed),
            dropped_dispatch_work_count: self.dropped_dispatch_work_count.load(Ordering::Relaxed),
            stale_flow_reap_count: self.stale_flow_reap_count.load(Ordering::Relaxed),
            closed_flow_id_eviction_count: self
                .closed_flow_id_eviction_count
                .load(Ordering::Relaxed),
            missing_connection_meta_count: self
                .missing_connection_meta_count
                .load(Ordering::Relaxed),
        }
    }

    pub(crate) fn record_connection_open(&self) {
        self.total_connections.fetch_add(1, Ordering::Relaxed);
        self.active_connections.fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn record_connection_close(&self) {
        let _ =
            self.active_connections
                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
                    Some(current.saturating_sub(1))
                });
    }

    pub(crate) fn record_handler_panic(&self) {
        self.handler_panic_count.fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn record_handler_timeout(&self) {
        self.handler_timeout_count.fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn record_upstream_connect_error(&self) {
        self.upstream_connect_error_count
            .fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn record_upstream_timeout(&self) {
        self.upstream_timeout_count.fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn record_process_attribution_failure(&self) {
        self.process_attribution_failure_count
            .fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn record_process_attribution_timeout(&self) {
        self.process_attribution_timeout_count
            .fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn record_process_cache_connection_hit(&self) {
        self.process_cache_connection_hit_count
            .fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn record_process_cache_identity_hit(&self) {
        self.process_cache_identity_hit_count
            .fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn record_process_cache_miss(&self) {
        self.process_cache_miss_count
            .fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn record_process_cache_eviction(&self) {
        self.process_cache_eviction_count
            .fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn record_process_pid_reuse_detected(&self) {
        self.process_pid_reuse_detected_count
            .fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn record_dispatch_drop(&self) {
        self.dropped_dispatch_work_count
            .fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn record_stale_flow_reap(&self) {
        self.stale_flow_reap_count.fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn record_closed_flow_id_eviction(&self) {
        self.closed_flow_id_eviction_count
            .fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn record_missing_connection_meta(&self) {
        self.missing_connection_meta_count
            .fetch_add(1, Ordering::Relaxed);
    }
}

#[derive(Debug)]
pub(crate) struct MetricsEventConsumer {
    store: std::sync::Arc<ProxyMetricsStore>,
}

impl MetricsEventConsumer {
    pub(crate) fn new(store: std::sync::Arc<ProxyMetricsStore>) -> Self {
        Self { store }
    }
}

impl EventConsumer for MetricsEventConsumer {
    fn consume(&self, envelope: EventEnvelope) {
        match envelope.event.kind {
            EventType::ConnectReceived => self.store.record_connection_open(),
            EventType::StreamClosed => {
                self.store.record_connection_close();
                let reason_code = envelope
                    .event
                    .attributes
                    .get("reason_code")
                    .map(std::string::String::as_str);
                let reason_detail = envelope
                    .event
                    .attributes
                    .get("reason_detail")
                    .map(std::string::String::as_str)
                    .unwrap_or_default();
                if stream_closed_trace_enabled() {
                    tracing::warn!(
                        flow_id = envelope.event.context.flow_id.as_u64(),
                        server_host = %envelope.event.context.server_host,
                        server_port = envelope.event.context.server_port,
                        protocol = ?envelope.event.context.protocol,
                        reason_code = reason_code.unwrap_or("unknown"),
                        reason_detail = reason_detail,
                        "stream closed diagnostic"
                    );
                }
                match reason_code {
                    Some("upstream_connect_failed") => {
                        self.store.record_upstream_connect_error();
                        if is_timeout_reason(reason_detail) {
                            self.store.record_upstream_timeout();
                        }
                    }
                    Some("stream_stage_timeout") => {
                        self.store.record_upstream_timeout();
                    }
                    _ => {}
                }
            }
            _ => {}
        }
    }
}

fn is_timeout_reason(reason_detail: &str) -> bool {
    let lower = reason_detail.to_ascii_lowercase();
    lower.contains("timed out") || lower.contains("timeout")
}

fn stream_closed_trace_enabled() -> bool {
    static STREAM_CLOSED_TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
    *STREAM_CLOSED_TRACE_ENABLED.get_or_init(|| {
        std::env::var("SOTH_PROXY_STREAM_CLOSED_TRACE")
            .ok()
            .map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
            .unwrap_or(false)
    })
}

#[cfg(test)]
mod tests {
    use crate::observe::{Event, EventConsumer, EventEnvelope, EventType, FlowContext};
    use crate::protocol::ApplicationProtocol;

    use super::ProxyMetricsStore;

    #[test]
    fn proxy_metrics_counter_contract() {
        let store = ProxyMetricsStore::default();

        store.record_connection_open();
        store.record_connection_open();
        store.record_connection_close();
        store.record_handler_timeout();
        store.record_handler_panic();

        let snapshot = store.snapshot();
        assert_eq!(snapshot.total_connections, 2);
        assert_eq!(snapshot.active_connections, 1);
        assert_eq!(snapshot.handler_timeout_count, 1);
        assert_eq!(snapshot.handler_panic_count, 1);
        assert_eq!(snapshot.upstream_connect_error_count, 0);
        assert_eq!(snapshot.upstream_timeout_count, 0);
        assert_eq!(snapshot.process_attribution_failure_count, 0);
        assert_eq!(snapshot.process_attribution_timeout_count, 0);
        assert_eq!(snapshot.process_cache_connection_hit_count, 0);
        assert_eq!(snapshot.process_cache_identity_hit_count, 0);
        assert_eq!(snapshot.process_cache_miss_count, 0);
        assert_eq!(snapshot.process_cache_eviction_count, 0);
        assert_eq!(snapshot.process_pid_reuse_detected_count, 0);
        assert_eq!(snapshot.dropped_dispatch_work_count, 0);
        assert_eq!(snapshot.stale_flow_reap_count, 0);
        assert_eq!(snapshot.closed_flow_id_eviction_count, 0);
        assert_eq!(snapshot.missing_connection_meta_count, 0);
    }

    #[test]
    fn missing_connection_meta_counter_increments() {
        let store = ProxyMetricsStore::default();
        store.record_missing_connection_meta();
        store.record_missing_connection_meta();
        assert_eq!(store.snapshot().missing_connection_meta_count, 2);
    }

    #[test]
    fn upstream_failure_metrics_are_wired_from_stream_closed_events() {
        let store = std::sync::Arc::new(ProxyMetricsStore::default());
        let consumer = super::MetricsEventConsumer::new(std::sync::Arc::clone(&store));

        consumer.consume(EventEnvelope::from_event(Event::new(
            EventType::ConnectReceived,
            sample_context(1),
        )));

        let mut connect_failed = Event::new(EventType::StreamClosed, sample_context(1));
        connect_failed.attributes.insert(
            "reason_code".to_string(),
            "upstream_connect_failed".to_string(),
        );
        connect_failed
            .attributes
            .insert("reason_detail".to_string(), "connect timeout".to_string());
        consumer.consume(EventEnvelope::from_event(connect_failed));

        consumer.consume(EventEnvelope::from_event(Event::new(
            EventType::ConnectReceived,
            sample_context(2),
        )));
        let mut stage_timeout = Event::new(EventType::StreamClosed, sample_context(2));
        stage_timeout.attributes.insert(
            "reason_code".to_string(),
            "stream_stage_timeout".to_string(),
        );
        consumer.consume(EventEnvelope::from_event(stage_timeout));

        let snapshot = store.snapshot();
        assert_eq!(snapshot.active_connections, 0);
        assert_eq!(snapshot.total_connections, 2);
        assert_eq!(snapshot.upstream_connect_error_count, 1);
        assert_eq!(snapshot.upstream_timeout_count, 2);
    }

    fn sample_context(flow_id: u64) -> FlowContext {
        use crate::types::FlowId;
        FlowContext {
            flow_id: FlowId(flow_id),
            client_addr: "127.0.0.1:1234".to_string(),
            server_host: "api.example.com".to_string(),
            server_port: 443,
            protocol: ApplicationProtocol::Http1,
        }
    }
}