use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
pub(crate) const COUNTER_MESSAGES_DROPPED: &str =
"fugle_marketdata_ws_messages_dropped_total";
pub(crate) const COUNTER_EVENTS_DROPPED: &str =
"fugle_marketdata_ws_events_dropped_total";
#[derive(Clone)]
pub(crate) struct DropCounter {
inner: Arc<DropCounterInner>,
}
struct DropCounterInner {
atomic: AtomicU64,
#[cfg(feature = "metrics")]
metric: ::metrics::Counter,
}
impl DropCounter {
pub(crate) fn new(metric_name: &'static str, endpoint: &str, client_id: &str) -> Self {
#[cfg(not(feature = "metrics"))]
let _ = (metric_name, endpoint, client_id);
let inner = DropCounterInner {
atomic: AtomicU64::new(0),
#[cfg(feature = "metrics")]
metric: ::metrics::counter!(
metric_name,
"endpoint" => endpoint.to_owned(),
"client_id" => client_id.to_owned()
),
};
Self {
inner: Arc::new(inner),
}
}
#[inline]
pub(crate) fn bump(&self) {
self.inner.atomic.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "metrics")]
self.inner.metric.increment(1);
}
#[inline]
pub(crate) fn load(&self) -> u64 {
self.inner.atomic.load(Ordering::Relaxed)
}
}
#[inline]
pub(crate) fn describe_drop_counters() {
#[cfg(feature = "metrics")]
{
::metrics::describe_counter!(
COUNTER_MESSAGES_DROPPED,
::metrics::Unit::Count,
"Inbound messages dropped because the consumer's `messages()` channel was saturated."
);
::metrics::describe_counter!(
COUNTER_EVENTS_DROPPED,
::metrics::Unit::Count,
"Lifecycle events dropped because the consumer's `connection_events()` channel was saturated."
);
}
}
pub(crate) fn endpoint_label(url: &str) -> String {
::url::Url::parse(url)
.ok()
.and_then(|u| u.host_str().map(str::to_owned))
.unwrap_or_default()
}
pub(crate) fn build_drop_counters(
config: &crate::websocket::ConnectionConfig,
) -> (DropCounter, DropCounter) {
describe_drop_counters();
let endpoint = endpoint_label(&config.url);
let client_id_label = config.client_id.clone().unwrap_or_default();
let messages = DropCounter::new(COUNTER_MESSAGES_DROPPED, &endpoint, &client_id_label);
let events = DropCounter::new(COUNTER_EVENTS_DROPPED, &endpoint, &client_id_label);
(messages, events)
}