use std::{
collections::HashMap,
marker::PhantomData,
sync::{Arc, Mutex},
};
use prometheus::{
core::{
Atomic, AtomicF64, AtomicI64, AtomicU64, Collector, GenericCounter, GenericCounterVec,
GenericGauge, GenericGaugeVec,
},
CounterVec, Encoder, GaugeVec, Histogram, HistogramOpts, HistogramVec, IntCounterVec,
IntGaugeVec, Opts, Registry,
};
use tokio::task::JoinHandle;
use warp::Filter;
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct MetricDescriptor<T, const N: usize>
where
T: Collector,
{
subsystem: Option<&'static str>,
name: &'static str,
help: &'static str,
labels: [&'static str; N],
_phantom: PhantomData<*const T>,
}
const fn validate_prometheus_name(name: &'static str) -> bool {
let name = name.as_bytes();
let mut i = 0;
loop {
if i == name.len() {
break true;
}
let char = name[i];
if !name[i].is_ascii_alphanumeric() && char != 0x5f && char != 0x3a {
break false;
}
i += 1;
}
}
const fn compile_validate_prometheus_name(name: &'static str) {
if !(validate_prometheus_name(name)) {
panic!(
"invalid prometheus name. Prometheus names must be alphanumeric, plus '_' or ':'. Please check your name, help, and labels"
);
}
}
impl<T, const N: usize> MetricDescriptor<T, N>
where
T: Collector,
{
pub const fn new(
subsystem: Option<&'static str>,
name: &'static str,
help: &'static str,
labels: [&'static str; N],
) -> Self {
if let Some(sub) = subsystem {
compile_validate_prometheus_name(sub);
}
compile_validate_prometheus_name(name);
compile_validate_prometheus_name(help);
let mut i = 0;
loop {
if i == N {
break;
}
compile_validate_prometheus_name(labels[i]);
i += 1;
}
Self {
subsystem,
name,
help,
labels,
_phantom: PhantomData,
}
}
pub fn qualified_name(&self) -> String {
if let Some(subsystem) = self.subsystem {
[subsystem, self.name].join("_")
} else {
self.name.to_owned()
}
}
pub const fn name(&self) -> &str {
self.name
}
pub const fn subsystem(&self) -> Option<&str> {
self.subsystem
}
pub const fn help(&self) -> &str {
self.help
}
pub const fn label_names(&self) -> [&str; N] {
self.labels
}
}
impl<T, const N: usize> Clone for MetricDescriptor<T, N>
where
T: Collector,
{
fn clone(&self) -> Self {
Self {
name: self.name,
subsystem: self.subsystem,
help: self.help,
labels: self.labels,
_phantom: self._phantom,
}
}
}
impl<T, const N: usize> Copy for MetricDescriptor<T, N> where T: Collector {}
impl<T, const N: usize> From<&MetricDescriptor<T, N>> for Opts
where
T: Collector,
{
fn from(m: &MetricDescriptor<T, N>) -> Self {
let mut o = Opts::new(m.name, m.help);
if let Some(subsytem) = m.subsystem() {
o = o.subsystem(subsytem);
}
o
}
}
impl<T, const N: usize> From<&MetricDescriptor<T, N>> for HistogramOpts
where
T: Collector,
{
fn from(m: &MetricDescriptor<T, N>) -> Self {
Opts::from(m).into()
}
}
pub trait MetricHandle<const N: usize>: Sized {
type Metric: Collector;
fn core(&self) -> &Metrics;
fn descriptor(&self) -> MetricDescriptor<Self::Metric, N>;
fn full_name(&self) -> String {
return self.core().full_name(&self.descriptor());
}
fn metric(&self, label_values: [&str; N]) -> Self::Metric;
}
#[doc(hidden)]
#[derive(Debug, Clone)]
pub struct Gvh<'a, T, const N: usize>
where
T: Atomic,
{
core: &'a Metrics,
descriptor: MetricDescriptor<<Self as MetricHandle<N>>::Metric, N>,
vec: GenericGaugeVec<T>,
}
impl<'a, T, const N: usize> MetricHandle<N> for Gvh<'a, T, N>
where
T: Atomic,
{
type Metric = GenericGauge<T>;
fn core(&self) -> &Metrics {
self.core
}
fn metric(&self, label_values: [&str; N]) -> Self::Metric {
self.vec
.get_metric_with_label_values(label_values.as_ref())
.expect("enforced by type system")
}
fn descriptor(&self) -> MetricDescriptor<<Self as MetricHandle<N>>::Metric, N> {
self.descriptor
}
}
#[doc(hidden)]
#[derive(Debug, Clone)]
pub struct Cvh<'a, T, const N: usize>
where
T: Atomic,
{
core: &'a Metrics,
descriptor: MetricDescriptor<<Self as MetricHandle<N>>::Metric, N>,
vec: GenericCounterVec<T>,
}
impl<'a, T, const N: usize> MetricHandle<N> for Cvh<'a, T, N>
where
T: Atomic,
{
type Metric = GenericCounter<T>;
fn core(&self) -> &Metrics {
self.core
}
fn descriptor(&self) -> MetricDescriptor<<Self as MetricHandle<N>>::Metric, N> {
self.descriptor
}
fn metric(&self, label_values: [&str; N]) -> Self::Metric {
self.vec
.get_metric_with_label_values(label_values.as_ref())
.expect("enforced by type system")
}
}
pub struct HistogramVecHandle<'a, const N: usize> {
core: &'a Metrics,
descriptor: MetricDescriptor<<Self as MetricHandle<N>>::Metric, N>,
buckets: Vec<f64>,
vec: HistogramVec,
}
impl<'a, const N: usize> HistogramVecHandle<'a, N> {
pub fn buckets(&self) -> &[f64] {
self.buckets.as_ref()
}
}
impl<'a, const N: usize> MetricHandle<N> for HistogramVecHandle<'a, N> {
type Metric = Histogram;
fn core(&self) -> &Metrics {
self.core
}
fn descriptor(&self) -> MetricDescriptor<<Self as MetricHandle<N>>::Metric, N> {
self.descriptor
}
fn metric(&self, label_values: [&str; N]) -> Self::Metric {
self.vec
.get_metric_with_label_values(label_values.as_ref())
.expect("enforced by type system")
}
}
pub type IntCounterVecDescriptor<'a, const N: usize> =
MetricDescriptor<<IntCounterVecHandle<'a, N> as MetricHandle<N>>::Metric, N>;
pub type CounterVecDescriptor<'a, const N: usize> =
MetricDescriptor<<CounterVecHandle<'a, N> as MetricHandle<N>>::Metric, N>;
pub type IntGaugeVecDescriptor<'a, const N: usize> =
MetricDescriptor<<IntGaugeVecHandle<'a, N> as MetricHandle<N>>::Metric, N>;
pub type GaugeVecDescriptor<'a, const N: usize> =
MetricDescriptor<<GaugeVecHandle<'a, N> as MetricHandle<N>>::Metric, N>;
pub type HistogramVecDescriptor<'a, const N: usize> =
MetricDescriptor<<HistogramVecHandle<'a, N> as MetricHandle<N>>::Metric, N>;
pub type IntCounterVecHandle<'a, const N: usize> = Cvh<'a, AtomicU64, N>;
pub type CounterVecHandle<'a, const N: usize> = Cvh<'a, AtomicF64, N>;
pub type IntGaugeVecHandle<'a, const N: usize> = Gvh<'a, AtomicI64, N>;
pub type GaugeVecHandle<'a, const N: usize> = Gvh<'a, AtomicF64, N>;
macro_rules! register {
($self:ident, $metric:ident) => {
$self
.registry
.register(Box::new($metric.clone()))
.expect("registry broken");
};
}
macro_rules! get_or_insert {
($self:ident, $map:ident, $metric_vec:ty, $descriptor:ident $(,)?) => {{
$self
.$map
.lock()
.expect("poison")
.entry($descriptor.name)
.or_insert_with(|| {
let metric = <$metric_vec>::new(
$self.opts($descriptor.name, $descriptor.help),
$descriptor.labels.as_ref(),
)
.expect("invalid name, help, or labels");
register!($self, metric);
metric
})
.clone()
}};
}
#[derive(Debug, Default)]
pub struct Metrics {
registry: Registry,
namespace: &'static str,
icv: Mutex<HashMap<&'static str, IntCounterVec>>,
cv: Mutex<HashMap<&'static str, CounterVec>>,
igv: Mutex<HashMap<&'static str, IntGaugeVec>>,
gv: Mutex<HashMap<&'static str, GaugeVec>>,
hv: Mutex<HashMap<&'static str, HistogramVec>>,
}
impl Metrics {
pub fn with_namespace(namespace: &'static str) -> Self {
Self {
namespace,
..Default::default()
}
}
pub fn with_registry(registry: Registry) -> Self {
Self {
registry,
..Default::default()
}
}
pub fn with_namespace_and_registry(namespace: &'static str, registry: Registry) -> Self {
Self {
namespace,
registry,
..Default::default()
}
}
pub fn full_name<T, const N: usize>(&self, descriptor: &MetricDescriptor<T, N>) -> String
where
T: Collector,
{
if self.namespace.is_empty() {
return descriptor.qualified_name();
}
[self.namespace, &descriptor.qualified_name()].join("_")
}
pub fn opts(&self, name: &'static str, help: &'static str) -> Opts {
Opts::new(name, help)
.namespace(self.namespace)
.const_label("PKG_VERSION", env!("CARGO_PKG_VERSION"))
}
pub fn histogram_opts(
&self,
name: &'static str,
help: &'static str,
buckets: &[f64],
) -> HistogramOpts {
HistogramOpts {
common_opts: self.opts(name, help),
buckets: buckets.to_vec(),
}
}
pub fn icv<'a, const N: usize>(
&'a self,
descriptor: IntCounterVecDescriptor<'a, N>,
) -> IntCounterVecHandle<'a, N> {
let icv = get_or_insert!(self, icv, IntCounterVec, descriptor);
IntCounterVecHandle {
core: self,
descriptor,
vec: icv,
}
}
pub fn cv<'a, const N: usize>(
&'a self,
descriptor: CounterVecDescriptor<'a, N>,
) -> CounterVecHandle<'a, N> {
let cv = get_or_insert!(self, cv, CounterVec, descriptor);
CounterVecHandle {
core: self,
descriptor,
vec: cv,
}
}
pub fn igv<'a, const N: usize>(
&'a self,
descriptor: IntGaugeVecDescriptor<'a, N>,
) -> IntGaugeVecHandle<'a, N> {
let igv = get_or_insert!(self, igv, IntGaugeVec, descriptor);
IntGaugeVecHandle {
core: self,
descriptor,
vec: igv,
}
}
pub fn gv<'a, const N: usize>(
&'a self,
descriptor: GaugeVecDescriptor<'a, N>,
) -> GaugeVecHandle<'a, N> {
let gv = get_or_insert!(self, gv, GaugeVec, descriptor);
GaugeVecHandle {
core: self,
descriptor,
vec: gv,
}
}
pub fn hv<'a, const N: usize>(
&'a self,
descriptor: HistogramVecDescriptor<'a, N>,
buckets: &[f64],
) -> HistogramVecHandle<'a, N> {
let hv = self
.hv
.lock()
.expect("poison")
.entry(descriptor.name)
.or_insert_with(|| {
let metric = HistogramVec::new((&descriptor).into(), descriptor.labels.as_ref())
.expect("invalid name, help, or labels");
register!(self, metric);
metric
})
.clone();
HistogramVecHandle {
core: self,
descriptor,
buckets: buckets.to_vec(),
vec: hv,
}
}
pub fn gather(&self) -> Vec<prometheus::proto::MetricFamily> {
self.registry.gather()
}
pub fn gather_text(&self) -> prometheus::Result<Vec<u8>> {
let collected_metrics = self.gather();
let mut out_buf = Vec::with_capacity(1024 * 64);
let encoder = prometheus::TextEncoder::new();
encoder.encode(&collected_metrics, &mut out_buf)?;
Ok(out_buf)
}
pub fn gather_with<E>(&self, encoder: &E) -> prometheus::Result<Vec<u8>>
where
E: prometheus::Encoder,
{
let collected_metrics = self.gather();
let mut out_buf = Vec::with_capacity(1024 * 64);
encoder.encode(&collected_metrics, &mut out_buf)?;
Ok(out_buf)
}
pub fn serve(self: &Arc<Self>, port: u16) -> JoinHandle<()> {
let this = self.clone();
tracing::info!(
port,
"starting prometheus server on 0.0.0.0:{port}",
port = port
);
tokio::spawn(async move {
warp::serve(
warp::path!("metrics")
.map(move || {
warp::reply::with_header(
this.gather_text().expect("failed to encode metrics"),
"Content-Type",
"text/plain; charset=utf-8",
)
})
.or(warp::any().map(|| {
warp::http::Response::builder()
.header("Location", "/metrics")
.status(301)
.body("".to_string())
})),
)
.run(([0, 0, 0, 0], port))
.await;
})
}
}