use std::sync::OnceLock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use dashmap::DashMap;
const MAX_DURATION_SAMPLES: usize = 10_000;
pub struct Metrics {
pub updates_total: AtomicU64,
pub errors_total: AtomicU64,
pub api_calls_total: AtomicU64,
pub api_calls_saved: AtomicU64,
pub active_chats: AtomicU64,
durations_us: DashMap<&'static str, RingBuffer>,
duration_totals: DashMap<&'static str, (u64, u64)>,
}
struct RingBuffer {
data: Vec<u64>,
pos: usize,
full: bool,
}
impl RingBuffer {
fn new() -> Self {
Self {
data: Vec::with_capacity(256), pos: 0,
full: false,
}
}
fn push(&mut self, value: u64) {
if self.data.len() < MAX_DURATION_SAMPLES {
self.data.push(value);
} else {
self.data[self.pos] = value;
self.pos = (self.pos + 1) % MAX_DURATION_SAMPLES;
self.full = true;
}
}
fn samples(&self) -> &[u64] {
&self.data
}
fn len(&self) -> usize {
self.data.len()
}
fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
impl Metrics {
pub fn new() -> Self {
Self {
updates_total: AtomicU64::new(0),
errors_total: AtomicU64::new(0),
api_calls_total: AtomicU64::new(0),
api_calls_saved: AtomicU64::new(0),
active_chats: AtomicU64::new(0),
durations_us: DashMap::new(),
duration_totals: DashMap::new(),
}
}
pub fn inc_updates(&self) {
self.updates_total.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_errors(&self) {
self.errors_total.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_api_calls(&self) {
self.api_calls_total.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_api_saved(&self) {
self.api_calls_saved.fetch_add(1, Ordering::Relaxed);
}
pub fn set_active_chats(&self, n: u64) {
self.active_chats.store(n, Ordering::Relaxed);
}
pub fn record_duration(&self, label: &'static str, duration: Duration) {
let us = duration.as_micros() as u64;
self.durations_us
.entry(label)
.or_insert_with(RingBuffer::new)
.push(us);
let mut totals = self.duration_totals.entry(label).or_insert((0, 0));
totals.0 += 1; totals.1 += us; }
#[must_use]
pub fn timer(&self, label: &'static str) -> Timer<'_> {
Timer {
metrics: self,
label,
start: Instant::now(),
}
}
pub fn prometheus(&self) -> String {
let mut out = String::with_capacity(512);
write_prom_counter(
&mut out,
"bg_updates_total",
"Total updates processed",
self.updates_total.load(Ordering::Relaxed),
);
write_prom_counter(
&mut out,
"bg_errors_total",
"Total errors",
self.errors_total.load(Ordering::Relaxed),
);
write_prom_counter(
&mut out,
"bg_api_calls_total",
"Total API calls",
self.api_calls_total.load(Ordering::Relaxed),
);
write_prom_counter(
&mut out,
"bg_api_calls_saved_total",
"API calls saved by diff/cache",
self.api_calls_saved.load(Ordering::Relaxed),
);
write_prom_gauge(
&mut out,
"bg_active_chats",
"Number of active chats",
self.active_chats.load(Ordering::Relaxed),
);
for entry in self.durations_us.iter() {
let label = *entry.key();
let ring = entry.value();
if ring.is_empty() {
continue;
}
let mut sorted: Vec<u64> = ring.samples().to_vec();
sorted.sort_unstable();
let (total_count, total_sum_us) = self
.duration_totals
.get(label)
.map(|r| *r.value())
.unwrap_or((sorted.len() as u64, sorted.iter().sum()));
let name = format!("bg_duration_{}", sanitize_prom(label));
out.push_str(&format!("# HELP {} Duration of {label} in seconds\n", name));
out.push_str(&format!("# TYPE {} summary\n", name));
out.push_str(&format!(
"{name}{{quantile=\"0.5\"}} {:.6}\n",
percentile_sec(&sorted, 50)
));
out.push_str(&format!(
"{name}{{quantile=\"0.95\"}} {:.6}\n",
percentile_sec(&sorted, 95)
));
out.push_str(&format!(
"{name}{{quantile=\"0.99\"}} {:.6}\n",
percentile_sec(&sorted, 99)
));
out.push_str(&format!(
"{name}_sum {:.6}\n",
total_sum_us as f64 / 1_000_000.0
));
out.push_str(&format!("{name}_count {total_count}\n"));
}
out
}
pub fn summary(&self) -> String {
let mut out = String::with_capacity(512);
out.push_str(&format!(
"Updates: {} | Errors: {} | API calls: {} (saved: {}) | Active chats: {}\n",
self.updates_total.load(Ordering::Relaxed),
self.errors_total.load(Ordering::Relaxed),
self.api_calls_total.load(Ordering::Relaxed),
self.api_calls_saved.load(Ordering::Relaxed),
self.active_chats.load(Ordering::Relaxed),
));
for entry in self.durations_us.iter() {
let label = *entry.key();
let ring = entry.value();
if ring.is_empty() {
continue;
}
let mut sorted: Vec<u64> = ring.samples().to_vec();
sorted.sort_unstable();
let (total_count, total_sum_us) = self
.duration_totals
.get(label)
.map(|r| *r.value())
.unwrap_or((sorted.len() as u64, sorted.iter().sum()));
let avg_us = if total_count > 0 {
total_sum_us / total_count
} else {
0
};
out.push_str(&format!(
" {label}: count={total_count}, avg={avg_us}µs, \
p50={}µs, p95={}µs, p99={}µs (window={})
",
percentile_us(&sorted, 50),
percentile_us(&sorted, 95),
percentile_us(&sorted, 99),
ring.len(),
));
}
out
}
}
impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}
pub struct Timer<'a> {
metrics: &'a Metrics,
label: &'static str,
start: Instant,
}
impl<'a> Timer<'a> {
pub fn elapsed(&self) -> Duration {
self.start.elapsed()
}
}
impl<'a> Drop for Timer<'a> {
fn drop(&mut self) {
self.metrics
.record_duration(self.label, self.start.elapsed());
}
}
static GLOBAL_METRICS: OnceLock<Metrics> = OnceLock::new();
pub fn metrics() -> &'static Metrics {
GLOBAL_METRICS.get_or_init(Metrics::new)
}
fn percentile_us(sorted: &[u64], p: u32) -> u64 {
if sorted.is_empty() {
return 0;
}
let idx = ((p as f64 / 100.0) * (sorted.len() as f64 - 1.0)).round() as usize;
sorted[idx.min(sorted.len() - 1)]
}
fn percentile_sec(sorted: &[u64], p: u32) -> f64 {
percentile_us(sorted, p) as f64 / 1_000_000.0
}
fn sanitize_prom(s: &str) -> String {
s.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '_' {
c
} else {
'_'
}
})
.collect()
}
fn write_prom_counter(out: &mut String, name: &str, help: &str, value: u64) {
out.push_str(&format!("# HELP {name} {help}\n"));
out.push_str(&format!("# TYPE {name} counter\n"));
out.push_str(&format!("{name} {value}\n"));
}
fn write_prom_gauge(out: &mut String, name: &str, help: &str, value: u64) {
out.push_str(&format!("# HELP {name} {help}\n"));
out.push_str(&format!("# TYPE {name} gauge\n"));
out.push_str(&format!("{name} {value}\n"));
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_counters() {
let m = Metrics::new();
m.inc_updates();
m.inc_updates();
m.inc_errors();
m.inc_api_calls();
m.inc_api_calls();
m.inc_api_calls();
m.inc_api_saved();
m.set_active_chats(42);
assert_eq!(m.updates_total.load(Ordering::Relaxed), 2);
assert_eq!(m.errors_total.load(Ordering::Relaxed), 1);
assert_eq!(m.api_calls_total.load(Ordering::Relaxed), 3);
assert_eq!(m.api_calls_saved.load(Ordering::Relaxed), 1);
assert_eq!(m.active_chats.load(Ordering::Relaxed), 42);
}
#[test]
fn test_record_duration() {
let m = Metrics::new();
m.record_duration("test_op", Duration::from_micros(100));
m.record_duration("test_op", Duration::from_micros(200));
m.record_duration("test_op", Duration::from_micros(300));
let ring = m.durations_us.get("test_op").unwrap();
assert_eq!(ring.value().len(), 3);
assert_eq!(ring.value().samples(), &[100u64, 200, 300]);
let totals = m.duration_totals.get("test_op").unwrap();
assert_eq!(*totals.value(), (3, 600));
}
#[test]
fn test_timer_records() {
let m = Metrics::new();
{
let _t = m.timer("sleep_test");
std::thread::sleep(Duration::from_millis(5));
}
let ring = m.durations_us.get("sleep_test").unwrap();
assert_eq!(ring.value().len(), 1);
assert!(
ring.value().samples()[0] >= 4_000,
"duration was {} µs",
ring.value().samples()[0]
);
}
#[test]
fn test_percentile() {
let sorted: Vec<u64> = (0..100).collect();
assert_eq!(percentile_us(&sorted, 50), 50);
assert_eq!(percentile_us(&sorted, 95), 94);
assert_eq!(percentile_us(&sorted, 99), 98);
}
#[test]
fn test_percentile_single() {
let sorted = vec![42];
assert_eq!(percentile_us(&sorted, 50), 42);
assert_eq!(percentile_us(&sorted, 99), 42);
}
#[test]
fn test_percentile_empty() {
let sorted: Vec<u64> = vec![];
assert_eq!(percentile_us(&sorted, 50), 0);
}
#[test]
fn test_prometheus_output() {
let m = Metrics::new();
m.inc_updates();
m.record_duration("handler", Duration::from_micros(500));
let prom = m.prometheus();
assert!(prom.contains("bg_updates_total 1"));
assert!(prom.contains("# TYPE bg_updates_total counter"));
assert!(prom.contains("bg_errors_total 0"));
assert!(prom.contains("bg_duration_handler"));
assert!(prom.contains("quantile=\"0.5\""));
}
#[test]
fn test_summary_output() {
let m = Metrics::new();
m.inc_updates();
m.inc_updates();
m.inc_errors();
m.set_active_chats(5);
m.record_duration("process", Duration::from_micros(100));
m.record_duration("process", Duration::from_micros(200));
let s = m.summary();
assert!(s.contains("Updates: 2"));
assert!(s.contains("Errors: 1"));
assert!(s.contains("Active chats: 5"));
assert!(s.contains("process:"));
assert!(s.contains("p50="));
}
#[test]
fn test_global_singleton() {
let a = metrics();
let b = metrics();
assert!(std::ptr::eq(a, b));
}
#[test]
fn test_sanitize_prom() {
assert_eq!(sanitize_prom("hello-world.foo"), "hello_world_foo");
assert_eq!(sanitize_prom("ok_name"), "ok_name");
}
}