use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use metrics::{
Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn, Key, KeyName, Metadata, Recorder,
SharedString, Unit,
};
struct SdkCounter(AtomicU64);
impl CounterFn for SdkCounter {
fn increment(&self, value: u64) {
self.0.fetch_add(value, Ordering::Relaxed);
}
fn absolute(&self, value: u64) {
self.0.fetch_max(value, Ordering::Relaxed);
}
}
struct SdkGauge(AtomicU64);
impl GaugeFn for SdkGauge {
fn increment(&self, value: f64) {
loop {
let current = self.0.load(Ordering::Relaxed);
let new_val = f64::from_bits(current) + value;
if self
.0
.compare_exchange_weak(
current,
new_val.to_bits(),
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
}
}
fn decrement(&self, value: f64) {
self.increment(-value);
}
fn set(&self, value: f64) {
self.0.store(value.to_bits(), Ordering::Relaxed);
}
}
struct SdkHistogram {
count: AtomicU64,
sum: AtomicU64, }
impl HistogramFn for SdkHistogram {
fn record(&self, value: f64) {
self.count.fetch_add(1, Ordering::Relaxed);
loop {
let current = self.sum.load(Ordering::Relaxed);
let new_val = f64::from_bits(current) + value;
if self
.sum
.compare_exchange_weak(
current,
new_val.to_bits(),
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
}
}
}
#[derive(Clone)]
pub(crate) struct MetricsStorage {
counters: Arc<RwLock<HashMap<String, Arc<SdkCounter>>>>,
gauges: Arc<RwLock<HashMap<String, Arc<SdkGauge>>>>,
histograms: Arc<RwLock<HashMap<String, Arc<SdkHistogram>>>>,
}
impl MetricsStorage {
pub fn new() -> Self {
Self {
counters: Arc::new(RwLock::new(HashMap::new())),
gauges: Arc::new(RwLock::new(HashMap::new())),
histograms: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn snapshot(&self) -> HashMap<String, f64> {
let mut result = HashMap::new();
if let Ok(counters) = self.counters.read() {
for (name, c) in counters.iter() {
result.insert(name.clone(), c.0.load(Ordering::Relaxed) as f64);
}
}
if let Ok(gauges) = self.gauges.read() {
for (name, g) in gauges.iter() {
result.insert(name.clone(), f64::from_bits(g.0.load(Ordering::Relaxed)));
}
}
if let Ok(histograms) = self.histograms.read() {
for (name, h) in histograms.iter() {
let count = h.count.load(Ordering::Relaxed);
let sum = f64::from_bits(h.sum.load(Ordering::Relaxed));
result.insert(format!("{name}.count"), count as f64);
if count > 0 {
result.insert(format!("{name}.avg"), sum / count as f64);
}
}
}
result
}
}
fn key_to_string(key: &Key) -> String {
let name = key.name();
let labels: Vec<String> = key
.labels()
.map(|l| format!("{}={}", l.key(), l.value()))
.collect();
if labels.is_empty() {
name.to_string()
} else {
format!("{}{{{}}}", name, labels.join(","))
}
}
pub(crate) struct SdkRecorder {
storage: MetricsStorage,
}
impl Recorder for SdkRecorder {
fn describe_counter(&self, _key: KeyName, _unit: Option<Unit>, _desc: SharedString) {}
fn describe_gauge(&self, _key: KeyName, _unit: Option<Unit>, _desc: SharedString) {}
fn describe_histogram(&self, _key: KeyName, _unit: Option<Unit>, _desc: SharedString) {}
fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter {
let name = key_to_string(key);
let mut map = self.storage.counters.write().unwrap();
let arc = map
.entry(name)
.or_insert_with(|| Arc::new(SdkCounter(AtomicU64::new(0))));
Counter::from_arc(arc.clone())
}
fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge {
let name = key_to_string(key);
let mut map = self.storage.gauges.write().unwrap();
let arc = map
.entry(name)
.or_insert_with(|| Arc::new(SdkGauge(AtomicU64::new(0))));
Gauge::from_arc(arc.clone())
}
fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram {
let name = key_to_string(key);
let mut map = self.storage.histograms.write().unwrap();
let arc = map.entry(name).or_insert_with(|| {
Arc::new(SdkHistogram {
count: AtomicU64::new(0),
sum: AtomicU64::new(0),
})
});
Histogram::from_arc(arc.clone())
}
}
static SDK_STORAGE: std::sync::OnceLock<MetricsStorage> = std::sync::OnceLock::new();
pub(crate) fn install() -> MetricsStorage {
if let Some(existing) = SDK_STORAGE.get() {
return existing.clone();
}
let storage = MetricsStorage::new();
let recorder = SdkRecorder {
storage: storage.clone(),
};
match metrics::set_global_recorder(recorder) {
Ok(()) => {
let _ = SDK_STORAGE.set(storage.clone());
storage
}
Err(_) => recorder_unavailable_fallback(storage, &SDK_STORAGE),
}
}
fn recorder_unavailable_fallback(
local: MetricsStorage,
cache: &std::sync::OnceLock<MetricsStorage>,
) -> MetricsStorage {
if let Some(shared) = cache.get() {
return shared.clone();
}
tracing::debug!(
"Strike48 SDK metrics recorder not installed: another \
recorder is already global. Connector metrics via \
metrics::counter!/gauge!/histogram! will flow to the \
application's recorder, not to ConnectorRunner::report()."
);
local
}
#[cfg(test)]
mod tests {
use super::*;
fn make_storage() -> MetricsStorage {
MetricsStorage::new()
}
fn make_recorder(storage: MetricsStorage) -> SdkRecorder {
SdkRecorder { storage }
}
fn test_key(name: &str) -> Key {
Key::from_name(name.to_string())
}
fn test_key_with_labels(name: &str, labels: &[(&str, &str)]) -> Key {
let kvs: Vec<metrics::Label> = labels
.iter()
.map(|(k, v)| metrics::Label::new(k.to_string(), v.to_string()))
.collect();
Key::from_parts(name.to_string(), kvs)
}
#[test]
fn test_counter_increment() {
let storage = make_storage();
let recorder = make_recorder(storage.clone());
let key = test_key("my.counter");
let meta = Metadata::new(module_path!(), metrics::Level::INFO, None);
let counter = recorder.register_counter(&key, &meta);
counter.increment(5);
counter.increment(3);
let snap = storage.snapshot();
assert_eq!(snap["my.counter"], 8.0);
}
#[test]
fn test_counter_absolute() {
let storage = make_storage();
let recorder = make_recorder(storage.clone());
let key = test_key("abs.counter");
let meta = Metadata::new(module_path!(), metrics::Level::INFO, None);
let counter = recorder.register_counter(&key, &meta);
counter.absolute(100);
let snap = storage.snapshot();
assert_eq!(snap["abs.counter"], 100.0);
}
#[test]
fn test_gauge_set() {
let storage = make_storage();
let recorder = make_recorder(storage.clone());
let key = test_key("my.gauge");
let meta = Metadata::new(module_path!(), metrics::Level::INFO, None);
let gauge = recorder.register_gauge(&key, &meta);
gauge.set(42.5);
let snap = storage.snapshot();
assert_eq!(snap["my.gauge"], 42.5);
}
#[test]
fn test_gauge_increment_decrement() {
let storage = make_storage();
let recorder = make_recorder(storage.clone());
let key = test_key("inc.gauge");
let meta = Metadata::new(module_path!(), metrics::Level::INFO, None);
let gauge = recorder.register_gauge(&key, &meta);
gauge.set(10.0);
gauge.increment(5.0);
gauge.decrement(3.0);
let snap = storage.snapshot();
assert_eq!(snap["inc.gauge"], 12.0);
}
#[test]
fn test_histogram_records() {
let storage = make_storage();
let recorder = make_recorder(storage.clone());
let key = test_key("my.hist");
let meta = Metadata::new(module_path!(), metrics::Level::INFO, None);
let hist = recorder.register_histogram(&key, &meta);
hist.record(10.0);
hist.record(20.0);
hist.record(30.0);
let snap = storage.snapshot();
assert_eq!(snap["my.hist.count"], 3.0);
assert_eq!(snap["my.hist.avg"], 20.0);
}
#[test]
fn test_labeled_metrics() {
let storage = make_storage();
let recorder = make_recorder(storage.clone());
let key_get = test_key_with_labels("http.requests", &[("method", "GET")]);
let key_post = test_key_with_labels("http.requests", &[("method", "POST")]);
let meta = Metadata::new(module_path!(), metrics::Level::INFO, None);
let c_get = recorder.register_counter(&key_get, &meta);
let c_post = recorder.register_counter(&key_post, &meta);
c_get.increment(10);
c_post.increment(3);
let snap = storage.snapshot();
assert_eq!(snap["http.requests{method=GET}"], 10.0);
assert_eq!(snap["http.requests{method=POST}"], 3.0);
}
#[test]
fn test_same_key_returns_same_handle() {
let storage = make_storage();
let recorder = make_recorder(storage.clone());
let key = test_key("shared.counter");
let meta = Metadata::new(module_path!(), metrics::Level::INFO, None);
let c1 = recorder.register_counter(&key, &meta);
let c2 = recorder.register_counter(&key, &meta);
c1.increment(5);
c2.increment(3);
let snap = storage.snapshot();
assert_eq!(snap["shared.counter"], 8.0);
}
#[test]
fn test_empty_snapshot() {
let storage = make_storage();
let snap = storage.snapshot();
assert!(snap.is_empty());
}
#[test]
fn test_key_to_string_no_labels() {
let key = test_key("simple.metric");
assert_eq!(key_to_string(&key), "simple.metric");
}
#[test]
fn test_key_to_string_with_labels() {
let key = test_key_with_labels("req", &[("method", "GET"), ("status", "200")]);
let s = key_to_string(&key);
assert!(s.starts_with("req{"));
assert!(s.contains("method=GET"));
assert!(s.contains("status=200"));
}
#[test]
fn recorder_unavailable_fallback_returns_winner_storage_on_lost_race() {
let cache: std::sync::OnceLock<MetricsStorage> = std::sync::OnceLock::new();
let winner = MetricsStorage::new();
if cache.set(winner.clone()).is_err() {
unreachable!("OnceLock was just constructed");
}
let loser_local = MetricsStorage::new();
let resolved = recorder_unavailable_fallback(loser_local, &cache);
let recorder = SdkRecorder {
storage: winner.clone(),
};
let key = test_key("strike48.test.lost_race");
let meta = Metadata::new(module_path!(), metrics::Level::INFO, None);
let counter = recorder.register_counter(&key, &meta);
counter.increment(7);
let snap = resolved.snapshot();
assert_eq!(
snap.get("strike48.test.lost_race").copied(),
Some(7.0),
"fallback storage must alias the cached winner storage"
);
}
#[test]
fn recorder_unavailable_fallback_returns_local_when_cache_empty() {
let cache: std::sync::OnceLock<MetricsStorage> = std::sync::OnceLock::new();
let local = MetricsStorage::new();
let resolved = recorder_unavailable_fallback(local.clone(), &cache);
let recorder = SdkRecorder {
storage: local.clone(),
};
let key = test_key("strike48.test.local_fallback");
let meta = Metadata::new(module_path!(), metrics::Level::INFO, None);
recorder.register_counter(&key, &meta).increment(3);
let snap = resolved.snapshot();
assert_eq!(snap["strike48.test.local_fallback"], 3.0);
}
#[test]
fn install_is_idempotent_within_a_process() {
let first = install();
let second = install();
metrics::counter!("strike48.test.idempotent_install").increment(7);
let s1 = first.snapshot();
let s2 = second.snapshot();
assert_eq!(s1, s2, "second install must share storage with first");
}
}