use std::sync::OnceLock;
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::{Family, MetricConstructor};
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::Registry;
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct CodeLabel {
pub code: u16,
pub endpoint: String,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct StageConnectionLabel {
pub stage: String,
pub connection: String,
}
#[derive(Debug)]
pub(crate) struct Metrics {
pub(crate) throughput: Family<CodeLabel, Counter>,
pub(crate) duration: Family<StageConnectionLabel, Histogram, CustomHistogramBuilder>,
pub(crate) batch_size: Family<StageConnectionLabel, Histogram>,
pub(crate) batch_duration: Family<StageConnectionLabel, Histogram>,
pub(crate) remaining_task: Gauge,
}
#[derive(Clone)]
pub(crate) struct CustomHistogramBuilder {
length: u16,
}
impl MetricConstructor<Histogram> for CustomHistogramBuilder {
fn new_metric(&self) -> Histogram {
Histogram::new(exponential_buckets(1e-3f64, 2f64, self.length))
}
}
impl Metrics {
pub(crate) fn global() -> &'static Metrics {
METRICS.get().expect("Metrics is not initialized")
}
pub(crate) fn new(timeout: u64) -> Self {
let builder = CustomHistogramBuilder {
length: (timeout as f64).log2().ceil() as u16 + 1,
};
Self {
throughput: Family::<CodeLabel, Counter>::default(),
duration:
Family::<StageConnectionLabel, Histogram, CustomHistogramBuilder>::new_with_constructor(
builder,
), batch_size: Family::<StageConnectionLabel, Histogram>::new_with_constructor(|| {
Histogram::new(exponential_buckets(1f64, 2f64, 10)) }),
batch_duration: Family::<StageConnectionLabel, Histogram>::new_with_constructor(|| {
Histogram::new(exponential_buckets(1e-3f64, 2f64, 13)) }),
remaining_task: Gauge::default(),
}
}
pub(crate) fn init_with_namespace(namespace: &str, timeout: u64) -> Self {
DURATION_LABEL
.set(StageConnectionLabel {
stage: "total".to_string(),
connection: "total".to_string(),
})
.unwrap();
let mut registry = <Registry>::default();
let metrics = Metrics::new(timeout);
registry.register(
format!("{namespace}_throughput"),
"service inference endpoint throughput",
metrics.throughput.clone(),
);
registry.register(
format!("{namespace}_process_duration_second"),
"process duration for each connection in each stage",
metrics.duration.clone(),
);
registry.register(
format!("{namespace}_batch_size"),
"batch size for each connection in each stage",
metrics.batch_size.clone(),
);
registry.register(
format!("{namespace}_batch_duration_second"),
"dynamic batching duration for each connection in each stage",
metrics.batch_duration.clone(),
);
registry.register(
format!("{namespace}_remaining_task"),
"remaining tasks for the whole service",
metrics.remaining_task.clone(),
);
REGISTRY.set(registry).unwrap();
metrics
}
}
pub(crate) static METRICS: OnceLock<Metrics> = OnceLock::new();
pub(crate) static REGISTRY: OnceLock<Registry> = OnceLock::new();
pub(crate) static DURATION_LABEL: OnceLock<StageConnectionLabel> = OnceLock::new();