Skip to main content

soth_mitm/
metrics.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2use std::sync::OnceLock;
3
4use crate::observe::{EventConsumer, EventEnvelope, EventType};
5
6/// Point-in-time snapshot of proxy operational metrics.
7#[derive(Debug, Clone, Default, PartialEq, Eq)]
8pub struct ProxyMetrics {
9    pub active_connections: u64,
10    pub total_connections: u64,
11    pub handler_panic_count: u64,
12    pub handler_timeout_count: u64,
13    pub upstream_connect_error_count: u64,
14    pub upstream_timeout_count: u64,
15    pub process_attribution_failure_count: u64,
16    pub process_attribution_timeout_count: u64,
17    pub process_cache_connection_hit_count: u64,
18    pub process_cache_identity_hit_count: u64,
19    pub process_cache_miss_count: u64,
20    pub process_cache_eviction_count: u64,
21    pub process_pid_reuse_detected_count: u64,
22    pub dropped_dispatch_work_count: u64,
23    pub stale_flow_reap_count: u64,
24    pub closed_flow_id_eviction_count: u64,
25    pub missing_connection_meta_count: u64,
26}
27
28#[derive(Debug, Default)]
29pub(crate) struct ProxyMetricsStore {
30    active_connections: AtomicU64,
31    total_connections: AtomicU64,
32    handler_panic_count: AtomicU64,
33    handler_timeout_count: AtomicU64,
34    upstream_connect_error_count: AtomicU64,
35    upstream_timeout_count: AtomicU64,
36    process_attribution_failure_count: AtomicU64,
37    process_attribution_timeout_count: AtomicU64,
38    process_cache_connection_hit_count: AtomicU64,
39    process_cache_identity_hit_count: AtomicU64,
40    process_cache_miss_count: AtomicU64,
41    process_cache_eviction_count: AtomicU64,
42    process_pid_reuse_detected_count: AtomicU64,
43    dropped_dispatch_work_count: AtomicU64,
44    stale_flow_reap_count: AtomicU64,
45    closed_flow_id_eviction_count: AtomicU64,
46    missing_connection_meta_count: AtomicU64,
47}
48
49impl ProxyMetricsStore {
50    pub(crate) fn snapshot(&self) -> ProxyMetrics {
51        ProxyMetrics {
52            active_connections: self.active_connections.load(Ordering::Relaxed),
53            total_connections: self.total_connections.load(Ordering::Relaxed),
54            handler_panic_count: self.handler_panic_count.load(Ordering::Relaxed),
55            handler_timeout_count: self.handler_timeout_count.load(Ordering::Relaxed),
56            upstream_connect_error_count: self.upstream_connect_error_count.load(Ordering::Relaxed),
57            upstream_timeout_count: self.upstream_timeout_count.load(Ordering::Relaxed),
58            process_attribution_failure_count: self
59                .process_attribution_failure_count
60                .load(Ordering::Relaxed),
61            process_attribution_timeout_count: self
62                .process_attribution_timeout_count
63                .load(Ordering::Relaxed),
64            process_cache_connection_hit_count: self
65                .process_cache_connection_hit_count
66                .load(Ordering::Relaxed),
67            process_cache_identity_hit_count: self
68                .process_cache_identity_hit_count
69                .load(Ordering::Relaxed),
70            process_cache_miss_count: self.process_cache_miss_count.load(Ordering::Relaxed),
71            process_cache_eviction_count: self.process_cache_eviction_count.load(Ordering::Relaxed),
72            process_pid_reuse_detected_count: self
73                .process_pid_reuse_detected_count
74                .load(Ordering::Relaxed),
75            dropped_dispatch_work_count: self.dropped_dispatch_work_count.load(Ordering::Relaxed),
76            stale_flow_reap_count: self.stale_flow_reap_count.load(Ordering::Relaxed),
77            closed_flow_id_eviction_count: self
78                .closed_flow_id_eviction_count
79                .load(Ordering::Relaxed),
80            missing_connection_meta_count: self
81                .missing_connection_meta_count
82                .load(Ordering::Relaxed),
83        }
84    }
85
86    pub(crate) fn record_connection_open(&self) {
87        self.total_connections.fetch_add(1, Ordering::Relaxed);
88        self.active_connections.fetch_add(1, Ordering::Relaxed);
89    }
90
91    pub(crate) fn record_connection_close(&self) {
92        let _ =
93            self.active_connections
94                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
95                    Some(current.saturating_sub(1))
96                });
97    }
98
99    pub(crate) fn record_handler_panic(&self) {
100        self.handler_panic_count.fetch_add(1, Ordering::Relaxed);
101    }
102
103    pub(crate) fn record_handler_timeout(&self) {
104        self.handler_timeout_count.fetch_add(1, Ordering::Relaxed);
105    }
106
107    pub(crate) fn record_upstream_connect_error(&self) {
108        self.upstream_connect_error_count
109            .fetch_add(1, Ordering::Relaxed);
110    }
111
112    pub(crate) fn record_upstream_timeout(&self) {
113        self.upstream_timeout_count.fetch_add(1, Ordering::Relaxed);
114    }
115
116    pub(crate) fn record_process_attribution_failure(&self) {
117        self.process_attribution_failure_count
118            .fetch_add(1, Ordering::Relaxed);
119    }
120
121    pub(crate) fn record_process_attribution_timeout(&self) {
122        self.process_attribution_timeout_count
123            .fetch_add(1, Ordering::Relaxed);
124    }
125
126    pub(crate) fn record_process_cache_connection_hit(&self) {
127        self.process_cache_connection_hit_count
128            .fetch_add(1, Ordering::Relaxed);
129    }
130
131    pub(crate) fn record_process_cache_identity_hit(&self) {
132        self.process_cache_identity_hit_count
133            .fetch_add(1, Ordering::Relaxed);
134    }
135
136    pub(crate) fn record_process_cache_miss(&self) {
137        self.process_cache_miss_count
138            .fetch_add(1, Ordering::Relaxed);
139    }
140
141    pub(crate) fn record_process_cache_eviction(&self) {
142        self.process_cache_eviction_count
143            .fetch_add(1, Ordering::Relaxed);
144    }
145
146    pub(crate) fn record_process_pid_reuse_detected(&self) {
147        self.process_pid_reuse_detected_count
148            .fetch_add(1, Ordering::Relaxed);
149    }
150
151    pub(crate) fn record_dispatch_drop(&self) {
152        self.dropped_dispatch_work_count
153            .fetch_add(1, Ordering::Relaxed);
154    }
155
156    pub(crate) fn record_stale_flow_reap(&self) {
157        self.stale_flow_reap_count.fetch_add(1, Ordering::Relaxed);
158    }
159
160    pub(crate) fn record_closed_flow_id_eviction(&self) {
161        self.closed_flow_id_eviction_count
162            .fetch_add(1, Ordering::Relaxed);
163    }
164
165    pub(crate) fn record_missing_connection_meta(&self) {
166        self.missing_connection_meta_count
167            .fetch_add(1, Ordering::Relaxed);
168    }
169}
170
171#[derive(Debug)]
172pub(crate) struct MetricsEventConsumer {
173    store: std::sync::Arc<ProxyMetricsStore>,
174}
175
176impl MetricsEventConsumer {
177    pub(crate) fn new(store: std::sync::Arc<ProxyMetricsStore>) -> Self {
178        Self { store }
179    }
180}
181
182impl EventConsumer for MetricsEventConsumer {
183    fn consume(&self, envelope: EventEnvelope) {
184        match envelope.event.kind {
185            EventType::ConnectReceived => self.store.record_connection_open(),
186            EventType::StreamClosed => {
187                self.store.record_connection_close();
188                let reason_code = envelope
189                    .event
190                    .attributes
191                    .get("reason_code")
192                    .map(std::string::String::as_str);
193                let reason_detail = envelope
194                    .event
195                    .attributes
196                    .get("reason_detail")
197                    .map(std::string::String::as_str)
198                    .unwrap_or_default();
199                if stream_closed_trace_enabled() {
200                    tracing::warn!(
201                        flow_id = envelope.event.context.flow_id.as_u64(),
202                        server_host = %envelope.event.context.server_host,
203                        server_port = envelope.event.context.server_port,
204                        protocol = ?envelope.event.context.protocol,
205                        reason_code = reason_code.unwrap_or("unknown"),
206                        reason_detail = reason_detail,
207                        "stream closed diagnostic"
208                    );
209                }
210                match reason_code {
211                    Some("upstream_connect_failed") => {
212                        self.store.record_upstream_connect_error();
213                        if is_timeout_reason(reason_detail) {
214                            self.store.record_upstream_timeout();
215                        }
216                    }
217                    Some("stream_stage_timeout") => {
218                        self.store.record_upstream_timeout();
219                    }
220                    _ => {}
221                }
222            }
223            _ => {}
224        }
225    }
226}
227
228fn is_timeout_reason(reason_detail: &str) -> bool {
229    let lower = reason_detail.to_ascii_lowercase();
230    lower.contains("timed out") || lower.contains("timeout")
231}
232
233fn stream_closed_trace_enabled() -> bool {
234    static STREAM_CLOSED_TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
235    *STREAM_CLOSED_TRACE_ENABLED.get_or_init(|| {
236        std::env::var("SOTH_PROXY_STREAM_CLOSED_TRACE")
237            .ok()
238            .map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
239            .unwrap_or(false)
240    })
241}
242
243#[cfg(test)]
244mod tests {
245    use crate::observe::{Event, EventConsumer, EventEnvelope, EventType, FlowContext};
246    use crate::protocol::ApplicationProtocol;
247
248    use super::ProxyMetricsStore;
249
250    #[test]
251    fn proxy_metrics_counter_contract() {
252        let store = ProxyMetricsStore::default();
253
254        store.record_connection_open();
255        store.record_connection_open();
256        store.record_connection_close();
257        store.record_handler_timeout();
258        store.record_handler_panic();
259
260        let snapshot = store.snapshot();
261        assert_eq!(snapshot.total_connections, 2);
262        assert_eq!(snapshot.active_connections, 1);
263        assert_eq!(snapshot.handler_timeout_count, 1);
264        assert_eq!(snapshot.handler_panic_count, 1);
265        assert_eq!(snapshot.upstream_connect_error_count, 0);
266        assert_eq!(snapshot.upstream_timeout_count, 0);
267        assert_eq!(snapshot.process_attribution_failure_count, 0);
268        assert_eq!(snapshot.process_attribution_timeout_count, 0);
269        assert_eq!(snapshot.process_cache_connection_hit_count, 0);
270        assert_eq!(snapshot.process_cache_identity_hit_count, 0);
271        assert_eq!(snapshot.process_cache_miss_count, 0);
272        assert_eq!(snapshot.process_cache_eviction_count, 0);
273        assert_eq!(snapshot.process_pid_reuse_detected_count, 0);
274        assert_eq!(snapshot.dropped_dispatch_work_count, 0);
275        assert_eq!(snapshot.stale_flow_reap_count, 0);
276        assert_eq!(snapshot.closed_flow_id_eviction_count, 0);
277        assert_eq!(snapshot.missing_connection_meta_count, 0);
278    }
279
280    #[test]
281    fn missing_connection_meta_counter_increments() {
282        let store = ProxyMetricsStore::default();
283        store.record_missing_connection_meta();
284        store.record_missing_connection_meta();
285        assert_eq!(store.snapshot().missing_connection_meta_count, 2);
286    }
287
288    #[test]
289    fn upstream_failure_metrics_are_wired_from_stream_closed_events() {
290        let store = std::sync::Arc::new(ProxyMetricsStore::default());
291        let consumer = super::MetricsEventConsumer::new(std::sync::Arc::clone(&store));
292
293        consumer.consume(EventEnvelope::from_event(Event::new(
294            EventType::ConnectReceived,
295            sample_context(1),
296        )));
297
298        let mut connect_failed = Event::new(EventType::StreamClosed, sample_context(1));
299        connect_failed.attributes.insert(
300            "reason_code".to_string(),
301            "upstream_connect_failed".to_string(),
302        );
303        connect_failed
304            .attributes
305            .insert("reason_detail".to_string(), "connect timeout".to_string());
306        consumer.consume(EventEnvelope::from_event(connect_failed));
307
308        consumer.consume(EventEnvelope::from_event(Event::new(
309            EventType::ConnectReceived,
310            sample_context(2),
311        )));
312        let mut stage_timeout = Event::new(EventType::StreamClosed, sample_context(2));
313        stage_timeout.attributes.insert(
314            "reason_code".to_string(),
315            "stream_stage_timeout".to_string(),
316        );
317        consumer.consume(EventEnvelope::from_event(stage_timeout));
318
319        let snapshot = store.snapshot();
320        assert_eq!(snapshot.active_connections, 0);
321        assert_eq!(snapshot.total_connections, 2);
322        assert_eq!(snapshot.upstream_connect_error_count, 1);
323        assert_eq!(snapshot.upstream_timeout_count, 2);
324    }
325
326    fn sample_context(flow_id: u64) -> FlowContext {
327        use crate::types::FlowId;
328        FlowContext {
329            flow_id: FlowId(flow_id),
330            client_addr: "127.0.0.1:1234".to_string(),
331            server_host: "api.example.com".to_string(),
332            server_port: 443,
333            protocol: ApplicationProtocol::Http1,
334        }
335    }
336}