#[cfg(feature = "metrics")]
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::{
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use serde::{Deserialize, Serialize};
#[repr(transparent)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct StageId(pub u16);
#[repr(transparent)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct CounterId(pub u32);
#[repr(transparent)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct GaugeId(pub u32);
#[repr(transparent)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct HistId(pub u32);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum MetricKind {
Counter,
Gauge,
HistogramBuckets,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricDef {
pub name: String,
pub unit: String,
pub description: String,
pub kind: MetricKind,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HistogramDef {
pub metric: MetricDef,
pub buckets: Vec<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsSchema {
pub stages: Vec<String>,
pub counters: Vec<MetricDef>,
pub gauges: Vec<MetricDef>,
pub histograms: Vec<HistogramDef>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsSnapshot {
pub captured_unix_ns: u64,
pub schema: MetricsSchema,
pub counter_values: Vec<Vec<u64>>,
pub gauge_values: Vec<Vec<i64>>,
pub histogram_values: Vec<Vec<Vec<u64>>>,
}
pub trait MetricsRegistry {
fn stage(&mut self, name: impl Into<String>) -> StageId;
fn counter(&mut self, def: MetricDef) -> CounterId;
fn gauge(&mut self, def: MetricDef) -> GaugeId;
fn histogram(&mut self, def: MetricDef, buckets: &[u64]) -> HistId;
}
#[derive(Debug, Default)]
pub struct MetricsBuilder {
stages: Vec<String>,
counters: Vec<MetricDef>,
gauges: Vec<MetricDef>,
histograms: Vec<HistogramDef>,
}
impl MetricsRegistry for MetricsBuilder {
fn stage(&mut self, name: impl Into<String>) -> StageId {
let id =
StageId(u16::try_from(self.stages.len()).expect("too many stages for StageId(u16)"));
self.stages.push(name.into());
id
}
fn counter(&mut self, def: MetricDef) -> CounterId {
debug_assert_eq!(def.kind, MetricKind::Counter);
let id = CounterId(self.counters.len() as u32);
self.counters.push(def);
id
}
fn gauge(&mut self, def: MetricDef) -> GaugeId {
debug_assert_eq!(def.kind, MetricKind::Gauge);
let id = GaugeId(self.gauges.len() as u32);
self.gauges.push(def);
id
}
fn histogram(&mut self, def: MetricDef, buckets: &[u64]) -> HistId {
debug_assert_eq!(def.kind, MetricKind::HistogramBuckets);
let id = HistId(self.histograms.len() as u32);
self.histograms.push(HistogramDef {
metric: def,
buckets: buckets.to_vec(),
});
id
}
}
impl MetricsBuilder {
pub fn build(self) -> Metrics {
let store = MetricsStore::new(&self);
Metrics {
schema: Arc::new(MetricsSchema {
stages: self.stages,
counters: self.counters,
gauges: self.gauges,
histograms: self.histograms,
}),
store: Arc::new(store),
}
}
}
#[derive(Debug, Clone)]
pub struct Metrics {
schema: Arc<MetricsSchema>,
store: Arc<MetricsStore>,
}
impl Metrics {
pub fn schema(&self) -> Arc<MetricsSchema> {
Arc::clone(&self.schema)
}
pub fn stage(&self, id: StageId) -> StageMetrics {
StageMetrics {
stage: id,
store: Arc::clone(&self.store),
}
}
pub fn snapshot(&self) -> MetricsSnapshot {
self.store.snapshot(&self.schema)
}
}
#[derive(Debug, Clone)]
pub struct StageMetrics {
pub stage: StageId,
store: Arc<MetricsStore>,
}
impl StageMetrics {
#[inline]
pub fn inc(&self, id: CounterId, by: u64) {
self.store.inc(self.stage, id, by);
}
#[inline]
pub fn set(&self, id: GaugeId, value: i64) {
self.store.set(self.stage, id, value);
}
#[inline]
pub fn observe(&self, id: HistId, value: u64) {
self.store.observe(self.stage, id, value);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct StageMetricIds {
pub in_total: CounterId,
pub out_total: CounterId,
pub err_total: CounterId,
pub backpressure_total: CounterId,
pub inflight: GaugeId,
pub queue_len: GaugeId,
pub duration_ns: HistId,
}
impl StageMetricIds {
pub fn register(reg: &mut impl MetricsRegistry) -> Self {
let in_total = reg.counter(MetricDef {
name: "btx.stage.in_total".into(),
unit: "1".into(),
description: "Work items received by a stage.".into(),
kind: MetricKind::Counter,
});
let out_total = reg.counter(MetricDef {
name: "btx.stage.out_total".into(),
unit: "1".into(),
description: "Work items emitted by a stage.".into(),
kind: MetricKind::Counter,
});
let err_total = reg.counter(MetricDef {
name: "btx.stage.err_total".into(),
unit: "1".into(),
description: "Errors observed by a stage.".into(),
kind: MetricKind::Counter,
});
let backpressure_total = reg.counter(MetricDef {
name: "btx.stage.backpressure_total".into(),
unit: "1".into(),
description: "Backpressure events observed by a stage.".into(),
kind: MetricKind::Counter,
});
let inflight = reg.gauge(MetricDef {
name: "btx.stage.inflight".into(),
unit: "1".into(),
description: "Current in-flight work items for a stage.".into(),
kind: MetricKind::Gauge,
});
let queue_len = reg.gauge(MetricDef {
name: "btx.stage.queue_len".into(),
unit: "1".into(),
description: "Current input backlog length for a stage (best-effort).".into(),
kind: MetricKind::Gauge,
});
let duration_ns = reg.histogram(
MetricDef {
name: "btx.stage.duration_ns".into(),
unit: "ns".into(),
description: "Wall-clock processing duration per work item.".into(),
kind: MetricKind::HistogramBuckets,
},
&[
1_000,
10_000,
100_000,
1_000_000,
10_000_000,
100_000_000,
1_000_000_000,
10_000_000_000,
],
);
Self {
in_total,
out_total,
err_total,
backpressure_total,
inflight,
queue_len,
duration_ns,
}
}
}
#[cfg(feature = "metrics")]
#[derive(Debug, Clone)]
pub struct StageObserver {
stage: StageMetrics,
ids: StageMetricIds,
}
#[cfg(feature = "metrics")]
impl StageObserver {
pub fn new(stage: StageMetrics, ids: StageMetricIds) -> Self {
Self { stage, ids }
}
#[inline]
pub fn inflight(&self, value: i64) {
self.stage.set(self.ids.inflight, value);
}
#[inline]
pub fn queue_len(&self, value: i64) {
self.stage.set(self.ids.queue_len, value);
}
#[inline]
pub fn in_total(&self, by: u64) {
self.stage.inc(self.ids.in_total, by);
}
#[inline]
pub fn out_total(&self, by: u64) {
self.stage.inc(self.ids.out_total, by);
}
#[inline]
pub fn err_total(&self, by: u64) {
self.stage.inc(self.ids.err_total, by);
}
#[inline]
pub fn backpressure_total(&self, by: u64) {
self.stage.inc(self.ids.backpressure_total, by);
}
#[inline]
pub fn duration_ns(&self, ns: u64) {
self.stage.observe(self.ids.duration_ns, ns);
}
}
#[cfg(not(feature = "metrics"))]
#[derive(Debug, Clone, Copy, Default)]
pub struct StageObserver;
#[cfg(not(feature = "metrics"))]
impl StageObserver {
#[inline]
pub fn new(_: StageMetrics, _: StageMetricIds) -> Self {
Self
}
#[inline]
pub fn inflight(&self, _: i64) {}
#[inline]
pub fn queue_len(&self, _: i64) {}
#[inline]
pub fn in_total(&self, _: u64) {}
#[inline]
pub fn out_total(&self, _: u64) {}
#[inline]
pub fn err_total(&self, _: u64) {}
#[inline]
pub fn backpressure_total(&self, _: u64) {}
#[inline]
pub fn duration_ns(&self, _: u64) {}
}
pub mod globals {
use super::{
Metrics, MetricsBuilder, MetricsRegistry, MetricsSnapshot, StageId, StageMetricIds,
StageMetrics, StageObserver,
};
use std::cell::Cell;
use std::sync::OnceLock;
#[derive(Debug)]
pub struct GlobalMetrics {
metrics: Metrics,
stage_ids: hashbrown::HashMap<&'static str, StageId>,
ids: StageMetricIds,
observers: Vec<StageObserver>,
}
static GLOBALS: OnceLock<GlobalMetrics> = OnceLock::new();
thread_local! {
static CURRENT: Cell<Option<&'static StageObserver>> = const { Cell::new(None) };
}
pub fn init(stages: &[&'static str]) -> &'static GlobalMetrics {
if let Some(g) = GLOBALS.get() {
for &name in stages {
if !g.stage_ids.contains_key(name) {
panic!(
"metrics already initialized without stage {name}; init must be called once with the full stage set"
);
}
}
return g;
}
GLOBALS.get_or_init(|| {
let mut b = MetricsBuilder::default();
let ids = StageMetricIds::register(&mut b);
let mut stage_ids = hashbrown::HashMap::with_capacity(stages.len());
for &name in stages {
let id = b.stage(name);
stage_ids.insert(name, id);
}
let metrics = b.build();
let observers = stages
.iter()
.map(|&name| {
let stage_id = *stage_ids
.get(name)
.unwrap_or_else(|| panic!("unknown stage name during init: {name}"));
StageObserver::new(metrics.stage(stage_id), ids)
})
.collect();
GlobalMetrics {
metrics,
stage_ids,
ids,
observers,
}
})
}
pub fn metrics() -> Metrics {
GLOBALS
.get()
.expect("metrics::globals::init() not called")
.metrics
.clone()
}
pub fn snapshot() -> MetricsSnapshot {
GLOBALS
.get()
.expect("metrics::globals::init() not called")
.metrics
.snapshot()
}
pub fn ids() -> StageMetricIds {
GLOBALS
.get()
.expect("metrics::globals::init() not called")
.ids
}
pub fn stage_id(name: &'static str) -> StageId {
*GLOBALS
.get()
.expect("metrics::globals::init() not called")
.stage_ids
.get(name)
.unwrap_or_else(|| panic!("unknown stage name: {name}"))
}
pub fn stage(name: &'static str) -> StageMetrics {
let g = GLOBALS.get().expect("metrics::globals::init() not called");
g.metrics.stage(stage_id(name))
}
pub fn observer(name: &'static str) -> &'static StageObserver {
let g = GLOBALS.get().expect("metrics::globals::init() not called");
let ix = stage_id(name).0 as usize;
g.observers
.get(ix)
.unwrap_or_else(|| panic!("no observer for stage {name} (ix={ix})"))
}
pub fn set_current(name: &'static str) {
let obs = observer(name);
CURRENT.with(|c| c.set(Some(obs)));
}
pub fn current() -> &'static StageObserver {
CURRENT
.with(|c| c.get())
.expect("metrics::globals::set_current() not called")
}
}
#[macro_export]
macro_rules! btx_metrics_init {
( $( $stage:literal ),+ $(,)? ) => {{
$crate::metrics::globals::init(&[ $( $stage ),+ ]);
}};
}
#[macro_export]
macro_rules! metrics_stage {
($stage:literal) => {{
$crate::metrics::globals::set_current($stage);
}};
}
#[cfg(feature = "metrics")]
#[macro_export]
macro_rules! metric_field {
($obs:expr, inflight : $value:expr) => {
$obs.inflight($value as i64);
};
($obs:expr, queue_len : $value:expr) => {
$obs.queue_len($value as i64);
};
($obs:expr, in_total : $value:expr) => {
$obs.in_total($value as u64);
};
($obs:expr, out_total : $value:expr) => {
$obs.out_total($value as u64);
};
($obs:expr, err_total : $value:expr) => {
$obs.err_total($value as u64);
};
($obs:expr, backpressure_total : $value:expr) => {
$obs.backpressure_total($value as u64);
};
($obs:expr, duration_ns : $value:expr) => {
$obs.duration_ns($value as u64);
};
}
#[cfg(not(feature = "metrics"))]
#[macro_export]
macro_rules! metric_field {
($obs:expr, $name:ident : $value:expr) => {{
std::hint::black_box(&$obs);
std::hint::black_box(&$value);
}};
}
#[cfg(feature = "metrics")]
#[macro_export]
macro_rules! metric {
( { $( $name:ident : $value:expr ),+ $(,)? } ) => {{
let obs = $crate::metrics::globals::current();
$( $crate::metric_field!(obs, $name : $value); )+
}};
}
#[cfg(not(feature = "metrics"))]
#[macro_export]
macro_rules! metric {
( { $( $name:ident : $value:expr ),+ $(,)? } ) => {{
$( std::hint::black_box(&$value); )+
}};
}
#[cfg(feature = "metrics")]
#[derive(Debug)]
struct MetricsStore {
counter_count: usize,
gauge_count: usize,
counters: Box<[AtomicU64]>,
gauges: Box<[AtomicI64]>,
hists: Box<[HistStore]>,
}
#[cfg(feature = "metrics")]
#[derive(Debug)]
struct HistStore {
buckets: Box<[u64]>,
counts: Box<[AtomicU64]>,
}
#[cfg(feature = "metrics")]
impl MetricsStore {
fn new(builder: &MetricsBuilder) -> Self {
let stage_count = builder.stages.len();
let counter_count = builder.counters.len();
let gauge_count = builder.gauges.len();
let counters_len = stage_count.saturating_mul(counter_count);
let gauges_len = stage_count.saturating_mul(gauge_count);
let counters: Box<[AtomicU64]> = (0..counters_len)
.map(|_| AtomicU64::new(0))
.collect::<Vec<_>>()
.into_boxed_slice();
let gauges: Box<[AtomicI64]> = (0..gauges_len)
.map(|_| AtomicI64::new(0))
.collect::<Vec<_>>()
.into_boxed_slice();
let hists: Box<[HistStore]> = builder
.histograms
.iter()
.map(|h| {
let buckets: Box<[u64]> = h.buckets.clone().into_boxed_slice();
let bucket_count = buckets.len().saturating_add(1); let len = stage_count.saturating_mul(bucket_count);
let counts: Box<[AtomicU64]> = (0..len)
.map(|_| AtomicU64::new(0))
.collect::<Vec<_>>()
.into_boxed_slice();
HistStore { buckets, counts }
})
.collect::<Vec<_>>()
.into_boxed_slice();
Self {
counter_count,
gauge_count,
counters,
gauges,
hists,
}
}
#[inline]
fn counter_index(&self, stage_ix: usize, counter_ix: usize) -> usize {
stage_ix
.saturating_mul(self.counter_count)
.saturating_add(counter_ix)
}
#[inline]
fn gauge_index(&self, stage_ix: usize, gauge_ix: usize) -> usize {
stage_ix
.saturating_mul(self.gauge_count)
.saturating_add(gauge_ix)
}
#[inline]
fn inc(&self, stage: StageId, id: CounterId, by: u64) {
let stage_ix = stage.0 as usize;
let counter_ix = id.0 as usize;
let Some(cell) = self.counters.get(self.counter_index(stage_ix, counter_ix)) else {
return;
};
cell.fetch_add(by, Ordering::Relaxed);
}
#[inline]
fn set(&self, stage: StageId, id: GaugeId, value: i64) {
let stage_ix = stage.0 as usize;
let gauge_ix = id.0 as usize;
let Some(cell) = self.gauges.get(self.gauge_index(stage_ix, gauge_ix)) else {
return;
};
cell.store(value, Ordering::Relaxed);
}
#[inline]
fn observe(&self, stage: StageId, id: HistId, value: u64) {
let stage_ix = stage.0 as usize;
let hist_ix = id.0 as usize;
let Some(hist) = self.hists.get(hist_ix) else {
return;
};
let bucket_ix = hist.bucket_index(value);
let bucket_count = hist.bucket_count_with_inf();
let ix = stage_ix
.saturating_mul(bucket_count)
.saturating_add(bucket_ix);
let Some(cell) = hist.counts.get(ix) else {
return;
};
cell.fetch_add(1, Ordering::Relaxed);
}
fn snapshot(&self, schema: &MetricsSchema) -> MetricsSnapshot {
let captured_unix_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
.min(u64::MAX as u128) as u64;
let stage_count = schema.stages.len();
let mut counter_values = vec![vec![0u64; schema.counters.len()]; stage_count];
for (stage_ix, stage_counters) in counter_values.iter_mut().enumerate().take(stage_count) {
for (counter_ix, value) in stage_counters
.iter_mut()
.enumerate()
.take(schema.counters.len())
{
*value = self
.counters
.get(self.counter_index(stage_ix, counter_ix))
.map(|a| a.load(Ordering::Relaxed))
.unwrap_or(0);
}
}
let mut gauge_values = vec![vec![0i64; schema.gauges.len()]; stage_count];
for (stage_ix, stage_gauges) in gauge_values.iter_mut().enumerate().take(stage_count) {
for (gauge_ix, value) in stage_gauges
.iter_mut()
.enumerate()
.take(schema.gauges.len())
{
*value = self
.gauges
.get(self.gauge_index(stage_ix, gauge_ix))
.map(|a| a.load(Ordering::Relaxed))
.unwrap_or(0);
}
}
let mut histogram_values = Vec::with_capacity(schema.histograms.len());
for (hist_ix, hist) in self.hists.iter().enumerate() {
let bucket_count = hist.bucket_count_with_inf();
let mut per_stage = vec![vec![0u64; bucket_count]; stage_count];
for (stage_ix, stage_buckets) in per_stage.iter_mut().enumerate().take(stage_count) {
let base = stage_ix * bucket_count;
for (b, value) in stage_buckets.iter_mut().enumerate().take(bucket_count) {
*value = hist
.counts
.get(base + b)
.map(|a| a.load(Ordering::Relaxed))
.unwrap_or(0);
}
}
debug_assert_eq!(hist_ix, histogram_values.len());
histogram_values.push(per_stage);
}
MetricsSnapshot {
captured_unix_ns,
schema: schema.clone(),
counter_values,
gauge_values,
histogram_values,
}
}
}
#[cfg(feature = "metrics")]
impl HistStore {
#[inline]
fn bucket_count_with_inf(&self) -> usize {
self.buckets.len().saturating_add(1)
}
#[inline]
fn bucket_index(&self, value: u64) -> usize {
self.buckets.partition_point(|upper| value > *upper)
}
}
#[cfg(not(feature = "metrics"))]
#[derive(Debug, Default)]
struct MetricsStore;
#[cfg(not(feature = "metrics"))]
impl MetricsStore {
fn new(_: &MetricsBuilder) -> Self {
Self
}
#[inline]
fn inc(&self, _: StageId, _: CounterId, _: u64) {}
#[inline]
fn set(&self, _: StageId, _: GaugeId, _: i64) {}
#[inline]
fn observe(&self, _: StageId, _: HistId, _: u64) {}
fn snapshot(&self, schema: &MetricsSchema) -> MetricsSnapshot {
let captured_unix_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
.min(u64::MAX as u128) as u64;
let stage_count = schema.stages.len();
MetricsSnapshot {
captured_unix_ns,
schema: schema.clone(),
counter_values: vec![vec![0u64; schema.counters.len()]; stage_count],
gauge_values: vec![vec![0i64; schema.gauges.len()]; stage_count],
histogram_values: schema
.histograms
.iter()
.map(|h| vec![vec![0u64; h.buckets.len() + 1]; stage_count])
.collect(),
}
}
}