1use std::sync::atomic::{AtomicU64, Ordering};
2use std::sync::OnceLock;
3
4use crate::observe::{EventConsumer, EventEnvelope, EventType};
5
6#[derive(Debug, Clone, Default, PartialEq, Eq)]
8pub struct ProxyMetrics {
9 pub active_connections: u64,
10 pub total_connections: u64,
11 pub handler_panic_count: u64,
12 pub handler_timeout_count: u64,
13 pub upstream_connect_error_count: u64,
14 pub upstream_timeout_count: u64,
15 pub process_attribution_failure_count: u64,
16 pub process_attribution_timeout_count: u64,
17 pub process_cache_connection_hit_count: u64,
18 pub process_cache_identity_hit_count: u64,
19 pub process_cache_miss_count: u64,
20 pub process_cache_eviction_count: u64,
21 pub process_pid_reuse_detected_count: u64,
22 pub dropped_dispatch_work_count: u64,
23 pub stale_flow_reap_count: u64,
24 pub closed_flow_id_eviction_count: u64,
25 pub missing_connection_meta_count: u64,
26}
27
28#[derive(Debug, Default)]
29pub(crate) struct ProxyMetricsStore {
30 active_connections: AtomicU64,
31 total_connections: AtomicU64,
32 handler_panic_count: AtomicU64,
33 handler_timeout_count: AtomicU64,
34 upstream_connect_error_count: AtomicU64,
35 upstream_timeout_count: AtomicU64,
36 process_attribution_failure_count: AtomicU64,
37 process_attribution_timeout_count: AtomicU64,
38 process_cache_connection_hit_count: AtomicU64,
39 process_cache_identity_hit_count: AtomicU64,
40 process_cache_miss_count: AtomicU64,
41 process_cache_eviction_count: AtomicU64,
42 process_pid_reuse_detected_count: AtomicU64,
43 dropped_dispatch_work_count: AtomicU64,
44 stale_flow_reap_count: AtomicU64,
45 closed_flow_id_eviction_count: AtomicU64,
46 missing_connection_meta_count: AtomicU64,
47}
48
49impl ProxyMetricsStore {
50 pub(crate) fn snapshot(&self) -> ProxyMetrics {
51 ProxyMetrics {
52 active_connections: self.active_connections.load(Ordering::Relaxed),
53 total_connections: self.total_connections.load(Ordering::Relaxed),
54 handler_panic_count: self.handler_panic_count.load(Ordering::Relaxed),
55 handler_timeout_count: self.handler_timeout_count.load(Ordering::Relaxed),
56 upstream_connect_error_count: self.upstream_connect_error_count.load(Ordering::Relaxed),
57 upstream_timeout_count: self.upstream_timeout_count.load(Ordering::Relaxed),
58 process_attribution_failure_count: self
59 .process_attribution_failure_count
60 .load(Ordering::Relaxed),
61 process_attribution_timeout_count: self
62 .process_attribution_timeout_count
63 .load(Ordering::Relaxed),
64 process_cache_connection_hit_count: self
65 .process_cache_connection_hit_count
66 .load(Ordering::Relaxed),
67 process_cache_identity_hit_count: self
68 .process_cache_identity_hit_count
69 .load(Ordering::Relaxed),
70 process_cache_miss_count: self.process_cache_miss_count.load(Ordering::Relaxed),
71 process_cache_eviction_count: self.process_cache_eviction_count.load(Ordering::Relaxed),
72 process_pid_reuse_detected_count: self
73 .process_pid_reuse_detected_count
74 .load(Ordering::Relaxed),
75 dropped_dispatch_work_count: self.dropped_dispatch_work_count.load(Ordering::Relaxed),
76 stale_flow_reap_count: self.stale_flow_reap_count.load(Ordering::Relaxed),
77 closed_flow_id_eviction_count: self
78 .closed_flow_id_eviction_count
79 .load(Ordering::Relaxed),
80 missing_connection_meta_count: self
81 .missing_connection_meta_count
82 .load(Ordering::Relaxed),
83 }
84 }
85
86 pub(crate) fn record_connection_open(&self) {
87 self.total_connections.fetch_add(1, Ordering::Relaxed);
88 self.active_connections.fetch_add(1, Ordering::Relaxed);
89 }
90
91 pub(crate) fn record_connection_close(&self) {
92 let _ =
93 self.active_connections
94 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
95 Some(current.saturating_sub(1))
96 });
97 }
98
99 pub(crate) fn record_handler_panic(&self) {
100 self.handler_panic_count.fetch_add(1, Ordering::Relaxed);
101 }
102
103 pub(crate) fn record_handler_timeout(&self) {
104 self.handler_timeout_count.fetch_add(1, Ordering::Relaxed);
105 }
106
107 pub(crate) fn record_upstream_connect_error(&self) {
108 self.upstream_connect_error_count
109 .fetch_add(1, Ordering::Relaxed);
110 }
111
112 pub(crate) fn record_upstream_timeout(&self) {
113 self.upstream_timeout_count.fetch_add(1, Ordering::Relaxed);
114 }
115
116 pub(crate) fn record_process_attribution_failure(&self) {
117 self.process_attribution_failure_count
118 .fetch_add(1, Ordering::Relaxed);
119 }
120
121 pub(crate) fn record_process_attribution_timeout(&self) {
122 self.process_attribution_timeout_count
123 .fetch_add(1, Ordering::Relaxed);
124 }
125
126 pub(crate) fn record_process_cache_connection_hit(&self) {
127 self.process_cache_connection_hit_count
128 .fetch_add(1, Ordering::Relaxed);
129 }
130
131 pub(crate) fn record_process_cache_identity_hit(&self) {
132 self.process_cache_identity_hit_count
133 .fetch_add(1, Ordering::Relaxed);
134 }
135
136 pub(crate) fn record_process_cache_miss(&self) {
137 self.process_cache_miss_count
138 .fetch_add(1, Ordering::Relaxed);
139 }
140
141 pub(crate) fn record_process_cache_eviction(&self) {
142 self.process_cache_eviction_count
143 .fetch_add(1, Ordering::Relaxed);
144 }
145
146 pub(crate) fn record_process_pid_reuse_detected(&self) {
147 self.process_pid_reuse_detected_count
148 .fetch_add(1, Ordering::Relaxed);
149 }
150
151 pub(crate) fn record_dispatch_drop(&self) {
152 self.dropped_dispatch_work_count
153 .fetch_add(1, Ordering::Relaxed);
154 }
155
156 pub(crate) fn record_stale_flow_reap(&self) {
157 self.stale_flow_reap_count.fetch_add(1, Ordering::Relaxed);
158 }
159
160 pub(crate) fn record_closed_flow_id_eviction(&self) {
161 self.closed_flow_id_eviction_count
162 .fetch_add(1, Ordering::Relaxed);
163 }
164
165 pub(crate) fn record_missing_connection_meta(&self) {
166 self.missing_connection_meta_count
167 .fetch_add(1, Ordering::Relaxed);
168 }
169}
170
171#[derive(Debug)]
172pub(crate) struct MetricsEventConsumer {
173 store: std::sync::Arc<ProxyMetricsStore>,
174}
175
176impl MetricsEventConsumer {
177 pub(crate) fn new(store: std::sync::Arc<ProxyMetricsStore>) -> Self {
178 Self { store }
179 }
180}
181
182impl EventConsumer for MetricsEventConsumer {
183 fn consume(&self, envelope: EventEnvelope) {
184 match envelope.event.kind {
185 EventType::ConnectReceived => self.store.record_connection_open(),
186 EventType::StreamClosed => {
187 self.store.record_connection_close();
188 let reason_code = envelope
189 .event
190 .attributes
191 .get("reason_code")
192 .map(std::string::String::as_str);
193 let reason_detail = envelope
194 .event
195 .attributes
196 .get("reason_detail")
197 .map(std::string::String::as_str)
198 .unwrap_or_default();
199 if stream_closed_trace_enabled() {
200 tracing::warn!(
201 flow_id = envelope.event.context.flow_id.as_u64(),
202 server_host = %envelope.event.context.server_host,
203 server_port = envelope.event.context.server_port,
204 protocol = ?envelope.event.context.protocol,
205 reason_code = reason_code.unwrap_or("unknown"),
206 reason_detail = reason_detail,
207 "stream closed diagnostic"
208 );
209 }
210 match reason_code {
211 Some("upstream_connect_failed") => {
212 self.store.record_upstream_connect_error();
213 if is_timeout_reason(reason_detail) {
214 self.store.record_upstream_timeout();
215 }
216 }
217 Some("stream_stage_timeout") => {
218 self.store.record_upstream_timeout();
219 }
220 _ => {}
221 }
222 }
223 _ => {}
224 }
225 }
226}
227
228fn is_timeout_reason(reason_detail: &str) -> bool {
229 let lower = reason_detail.to_ascii_lowercase();
230 lower.contains("timed out") || lower.contains("timeout")
231}
232
233fn stream_closed_trace_enabled() -> bool {
234 static STREAM_CLOSED_TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
235 *STREAM_CLOSED_TRACE_ENABLED.get_or_init(|| {
236 std::env::var("SOTH_PROXY_STREAM_CLOSED_TRACE")
237 .ok()
238 .map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
239 .unwrap_or(false)
240 })
241}
242
243#[cfg(test)]
244mod tests {
245 use crate::observe::{Event, EventConsumer, EventEnvelope, EventType, FlowContext};
246 use crate::protocol::ApplicationProtocol;
247
248 use super::ProxyMetricsStore;
249
250 #[test]
251 fn proxy_metrics_counter_contract() {
252 let store = ProxyMetricsStore::default();
253
254 store.record_connection_open();
255 store.record_connection_open();
256 store.record_connection_close();
257 store.record_handler_timeout();
258 store.record_handler_panic();
259
260 let snapshot = store.snapshot();
261 assert_eq!(snapshot.total_connections, 2);
262 assert_eq!(snapshot.active_connections, 1);
263 assert_eq!(snapshot.handler_timeout_count, 1);
264 assert_eq!(snapshot.handler_panic_count, 1);
265 assert_eq!(snapshot.upstream_connect_error_count, 0);
266 assert_eq!(snapshot.upstream_timeout_count, 0);
267 assert_eq!(snapshot.process_attribution_failure_count, 0);
268 assert_eq!(snapshot.process_attribution_timeout_count, 0);
269 assert_eq!(snapshot.process_cache_connection_hit_count, 0);
270 assert_eq!(snapshot.process_cache_identity_hit_count, 0);
271 assert_eq!(snapshot.process_cache_miss_count, 0);
272 assert_eq!(snapshot.process_cache_eviction_count, 0);
273 assert_eq!(snapshot.process_pid_reuse_detected_count, 0);
274 assert_eq!(snapshot.dropped_dispatch_work_count, 0);
275 assert_eq!(snapshot.stale_flow_reap_count, 0);
276 assert_eq!(snapshot.closed_flow_id_eviction_count, 0);
277 assert_eq!(snapshot.missing_connection_meta_count, 0);
278 }
279
280 #[test]
281 fn missing_connection_meta_counter_increments() {
282 let store = ProxyMetricsStore::default();
283 store.record_missing_connection_meta();
284 store.record_missing_connection_meta();
285 assert_eq!(store.snapshot().missing_connection_meta_count, 2);
286 }
287
288 #[test]
289 fn upstream_failure_metrics_are_wired_from_stream_closed_events() {
290 let store = std::sync::Arc::new(ProxyMetricsStore::default());
291 let consumer = super::MetricsEventConsumer::new(std::sync::Arc::clone(&store));
292
293 consumer.consume(EventEnvelope::from_event(Event::new(
294 EventType::ConnectReceived,
295 sample_context(1),
296 )));
297
298 let mut connect_failed = Event::new(EventType::StreamClosed, sample_context(1));
299 connect_failed.attributes.insert(
300 "reason_code".to_string(),
301 "upstream_connect_failed".to_string(),
302 );
303 connect_failed
304 .attributes
305 .insert("reason_detail".to_string(), "connect timeout".to_string());
306 consumer.consume(EventEnvelope::from_event(connect_failed));
307
308 consumer.consume(EventEnvelope::from_event(Event::new(
309 EventType::ConnectReceived,
310 sample_context(2),
311 )));
312 let mut stage_timeout = Event::new(EventType::StreamClosed, sample_context(2));
313 stage_timeout.attributes.insert(
314 "reason_code".to_string(),
315 "stream_stage_timeout".to_string(),
316 );
317 consumer.consume(EventEnvelope::from_event(stage_timeout));
318
319 let snapshot = store.snapshot();
320 assert_eq!(snapshot.active_connections, 0);
321 assert_eq!(snapshot.total_connections, 2);
322 assert_eq!(snapshot.upstream_connect_error_count, 1);
323 assert_eq!(snapshot.upstream_timeout_count, 2);
324 }
325
326 fn sample_context(flow_id: u64) -> FlowContext {
327 use crate::types::FlowId;
328 FlowContext {
329 flow_id: FlowId(flow_id),
330 client_addr: "127.0.0.1:1234".to_string(),
331 server_host: "api.example.com".to_string(),
332 server_port: 443,
333 protocol: ApplicationProtocol::Http1,
334 }
335 }
336}