use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::OnceLock;
use crate::observe::{EventConsumer, EventEnvelope, EventType};
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ProxyMetrics {
pub active_connections: u64,
pub total_connections: u64,
pub handler_panic_count: u64,
pub handler_timeout_count: u64,
pub upstream_connect_error_count: u64,
pub upstream_timeout_count: u64,
pub process_attribution_failure_count: u64,
pub process_attribution_timeout_count: u64,
pub process_cache_connection_hit_count: u64,
pub process_cache_identity_hit_count: u64,
pub process_cache_miss_count: u64,
pub process_cache_eviction_count: u64,
pub process_pid_reuse_detected_count: u64,
pub dropped_dispatch_work_count: u64,
pub stale_flow_reap_count: u64,
pub closed_flow_id_eviction_count: u64,
pub missing_connection_meta_count: u64,
}
#[derive(Debug, Default)]
pub(crate) struct ProxyMetricsStore {
active_connections: AtomicU64,
total_connections: AtomicU64,
handler_panic_count: AtomicU64,
handler_timeout_count: AtomicU64,
upstream_connect_error_count: AtomicU64,
upstream_timeout_count: AtomicU64,
process_attribution_failure_count: AtomicU64,
process_attribution_timeout_count: AtomicU64,
process_cache_connection_hit_count: AtomicU64,
process_cache_identity_hit_count: AtomicU64,
process_cache_miss_count: AtomicU64,
process_cache_eviction_count: AtomicU64,
process_pid_reuse_detected_count: AtomicU64,
dropped_dispatch_work_count: AtomicU64,
stale_flow_reap_count: AtomicU64,
closed_flow_id_eviction_count: AtomicU64,
missing_connection_meta_count: AtomicU64,
}
impl ProxyMetricsStore {
pub(crate) fn snapshot(&self) -> ProxyMetrics {
ProxyMetrics {
active_connections: self.active_connections.load(Ordering::Relaxed),
total_connections: self.total_connections.load(Ordering::Relaxed),
handler_panic_count: self.handler_panic_count.load(Ordering::Relaxed),
handler_timeout_count: self.handler_timeout_count.load(Ordering::Relaxed),
upstream_connect_error_count: self.upstream_connect_error_count.load(Ordering::Relaxed),
upstream_timeout_count: self.upstream_timeout_count.load(Ordering::Relaxed),
process_attribution_failure_count: self
.process_attribution_failure_count
.load(Ordering::Relaxed),
process_attribution_timeout_count: self
.process_attribution_timeout_count
.load(Ordering::Relaxed),
process_cache_connection_hit_count: self
.process_cache_connection_hit_count
.load(Ordering::Relaxed),
process_cache_identity_hit_count: self
.process_cache_identity_hit_count
.load(Ordering::Relaxed),
process_cache_miss_count: self.process_cache_miss_count.load(Ordering::Relaxed),
process_cache_eviction_count: self.process_cache_eviction_count.load(Ordering::Relaxed),
process_pid_reuse_detected_count: self
.process_pid_reuse_detected_count
.load(Ordering::Relaxed),
dropped_dispatch_work_count: self.dropped_dispatch_work_count.load(Ordering::Relaxed),
stale_flow_reap_count: self.stale_flow_reap_count.load(Ordering::Relaxed),
closed_flow_id_eviction_count: self
.closed_flow_id_eviction_count
.load(Ordering::Relaxed),
missing_connection_meta_count: self
.missing_connection_meta_count
.load(Ordering::Relaxed),
}
}
pub(crate) fn record_connection_open(&self) {
self.total_connections.fetch_add(1, Ordering::Relaxed);
self.active_connections.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_connection_close(&self) {
let _ =
self.active_connections
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
Some(current.saturating_sub(1))
});
}
pub(crate) fn record_handler_panic(&self) {
self.handler_panic_count.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_handler_timeout(&self) {
self.handler_timeout_count.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_upstream_connect_error(&self) {
self.upstream_connect_error_count
.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_upstream_timeout(&self) {
self.upstream_timeout_count.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_process_attribution_failure(&self) {
self.process_attribution_failure_count
.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_process_attribution_timeout(&self) {
self.process_attribution_timeout_count
.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_process_cache_connection_hit(&self) {
self.process_cache_connection_hit_count
.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_process_cache_identity_hit(&self) {
self.process_cache_identity_hit_count
.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_process_cache_miss(&self) {
self.process_cache_miss_count
.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_process_cache_eviction(&self) {
self.process_cache_eviction_count
.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_process_pid_reuse_detected(&self) {
self.process_pid_reuse_detected_count
.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_dispatch_drop(&self) {
self.dropped_dispatch_work_count
.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_stale_flow_reap(&self) {
self.stale_flow_reap_count.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_closed_flow_id_eviction(&self) {
self.closed_flow_id_eviction_count
.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn record_missing_connection_meta(&self) {
self.missing_connection_meta_count
.fetch_add(1, Ordering::Relaxed);
}
}
#[derive(Debug)]
pub(crate) struct MetricsEventConsumer {
store: std::sync::Arc<ProxyMetricsStore>,
}
impl MetricsEventConsumer {
pub(crate) fn new(store: std::sync::Arc<ProxyMetricsStore>) -> Self {
Self { store }
}
}
impl EventConsumer for MetricsEventConsumer {
fn consume(&self, envelope: EventEnvelope) {
match envelope.event.kind {
EventType::ConnectReceived => self.store.record_connection_open(),
EventType::StreamClosed => {
self.store.record_connection_close();
let reason_code = envelope
.event
.attributes
.get("reason_code")
.map(std::string::String::as_str);
let reason_detail = envelope
.event
.attributes
.get("reason_detail")
.map(std::string::String::as_str)
.unwrap_or_default();
if stream_closed_trace_enabled() {
tracing::warn!(
flow_id = envelope.event.context.flow_id.as_u64(),
server_host = %envelope.event.context.server_host,
server_port = envelope.event.context.server_port,
protocol = ?envelope.event.context.protocol,
reason_code = reason_code.unwrap_or("unknown"),
reason_detail = reason_detail,
"stream closed diagnostic"
);
}
match reason_code {
Some("upstream_connect_failed") => {
self.store.record_upstream_connect_error();
if is_timeout_reason(reason_detail) {
self.store.record_upstream_timeout();
}
}
Some("stream_stage_timeout") => {
self.store.record_upstream_timeout();
}
_ => {}
}
}
_ => {}
}
}
}
fn is_timeout_reason(reason_detail: &str) -> bool {
let lower = reason_detail.to_ascii_lowercase();
lower.contains("timed out") || lower.contains("timeout")
}
fn stream_closed_trace_enabled() -> bool {
static STREAM_CLOSED_TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
*STREAM_CLOSED_TRACE_ENABLED.get_or_init(|| {
std::env::var("SOTH_PROXY_STREAM_CLOSED_TRACE")
.ok()
.map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false)
})
}
#[cfg(test)]
mod tests {
use crate::observe::{Event, EventConsumer, EventEnvelope, EventType, FlowContext};
use crate::protocol::ApplicationProtocol;
use super::ProxyMetricsStore;
#[test]
fn proxy_metrics_counter_contract() {
let store = ProxyMetricsStore::default();
store.record_connection_open();
store.record_connection_open();
store.record_connection_close();
store.record_handler_timeout();
store.record_handler_panic();
let snapshot = store.snapshot();
assert_eq!(snapshot.total_connections, 2);
assert_eq!(snapshot.active_connections, 1);
assert_eq!(snapshot.handler_timeout_count, 1);
assert_eq!(snapshot.handler_panic_count, 1);
assert_eq!(snapshot.upstream_connect_error_count, 0);
assert_eq!(snapshot.upstream_timeout_count, 0);
assert_eq!(snapshot.process_attribution_failure_count, 0);
assert_eq!(snapshot.process_attribution_timeout_count, 0);
assert_eq!(snapshot.process_cache_connection_hit_count, 0);
assert_eq!(snapshot.process_cache_identity_hit_count, 0);
assert_eq!(snapshot.process_cache_miss_count, 0);
assert_eq!(snapshot.process_cache_eviction_count, 0);
assert_eq!(snapshot.process_pid_reuse_detected_count, 0);
assert_eq!(snapshot.dropped_dispatch_work_count, 0);
assert_eq!(snapshot.stale_flow_reap_count, 0);
assert_eq!(snapshot.closed_flow_id_eviction_count, 0);
assert_eq!(snapshot.missing_connection_meta_count, 0);
}
#[test]
fn missing_connection_meta_counter_increments() {
let store = ProxyMetricsStore::default();
store.record_missing_connection_meta();
store.record_missing_connection_meta();
assert_eq!(store.snapshot().missing_connection_meta_count, 2);
}
#[test]
fn upstream_failure_metrics_are_wired_from_stream_closed_events() {
let store = std::sync::Arc::new(ProxyMetricsStore::default());
let consumer = super::MetricsEventConsumer::new(std::sync::Arc::clone(&store));
consumer.consume(EventEnvelope::from_event(Event::new(
EventType::ConnectReceived,
sample_context(1),
)));
let mut connect_failed = Event::new(EventType::StreamClosed, sample_context(1));
connect_failed.attributes.insert(
"reason_code".to_string(),
"upstream_connect_failed".to_string(),
);
connect_failed
.attributes
.insert("reason_detail".to_string(), "connect timeout".to_string());
consumer.consume(EventEnvelope::from_event(connect_failed));
consumer.consume(EventEnvelope::from_event(Event::new(
EventType::ConnectReceived,
sample_context(2),
)));
let mut stage_timeout = Event::new(EventType::StreamClosed, sample_context(2));
stage_timeout.attributes.insert(
"reason_code".to_string(),
"stream_stage_timeout".to_string(),
);
consumer.consume(EventEnvelope::from_event(stage_timeout));
let snapshot = store.snapshot();
assert_eq!(snapshot.active_connections, 0);
assert_eq!(snapshot.total_connections, 2);
assert_eq!(snapshot.upstream_connect_error_count, 1);
assert_eq!(snapshot.upstream_timeout_count, 2);
}
fn sample_context(flow_id: u64) -> FlowContext {
use crate::types::FlowId;
FlowContext {
flow_id: FlowId(flow_id),
client_addr: "127.0.0.1:1234".to_string(),
server_host: "api.example.com".to_string(),
server_port: 443,
protocol: ApplicationProtocol::Http1,
}
}
}