use std::{
collections::{HashMap, hash_map::Entry},
fmt::{self, Debug},
hash::{BuildHasher, Hash},
sync::Arc,
};
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::{
encoder::{EncodeLabelSet, EncodeMetric, MetricEncoder},
error::Result,
raw::{LabelSetSchema, MetricLabelSet, MetricType, TypedMetric},
};
type MetricFactory<LS, M> = dyn Fn(&LS) -> M + Send + Sync + 'static;
cfg_if::cfg_if! {
if #[cfg(feature = "foldhash")] {
type RandomState = foldhash::fast::RandomState;
} else {
type RandomState = std::hash::RandomState;
}
}
#[derive(Clone)]
pub struct Family<LS, M, S = RandomState> {
metrics: Arc<RwLock<HashMap<LS, M, S>>>,
metric_factory: Arc<MetricFactory<LS, M>>,
}
impl<LS, M, S> Debug for Family<LS, M, S>
where
LS: Debug,
M: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MetricFamily").field("metrics", &self.metrics).finish()
}
}
impl<LS, M, S> Default for Family<LS, M, S>
where
M: Default + 'static,
S: Default,
{
fn default() -> Self {
Self::new(M::default)
}
}
impl<LS, M, S> Family<LS, M, S> {
pub(crate) fn read(&self) -> RwLockReadGuard<'_, HashMap<LS, M, S>> {
self.metrics.read()
}
pub(crate) fn write(&self) -> RwLockWriteGuard<'_, HashMap<LS, M, S>> {
self.metrics.write()
}
}
impl<LS, M, S> Family<LS, M, S> {
pub fn new(metric_factory: impl Fn() -> M + Send + Sync + 'static) -> Self
where
S: Default,
{
Self::new_with_labels(move |_| metric_factory())
}
pub fn new_with_labels(metric_factory: impl Fn(&LS) -> M + Send + Sync + 'static) -> Self
where
S: Default,
{
Self {
metrics: Arc::new(RwLock::new(HashMap::default())),
metric_factory: Arc::new(metric_factory),
}
}
pub fn with<R, F>(&self, labels: &LS, func: F) -> Option<R>
where
LS: Eq + Hash,
F: FnOnce(&M) -> R,
S: BuildHasher,
{
let guard = self.read();
guard.get(labels).map(func)
}
pub fn with_or_new<R, F>(&self, labels: &LS, func: F) -> R
where
LS: Clone + Eq + Hash,
F: FnOnce(&M) -> R,
S: BuildHasher,
{
let read_guard = self.read();
if let Some(metric) = read_guard.get(labels) {
return func(metric);
}
drop(read_guard);
let mut new_metric = None;
loop {
let mut write_guard = self.write();
match write_guard.entry(labels.clone()) {
Entry::Occupied(entry) => return func(entry.get()),
Entry::Vacant(entry) => {
if let Some(metric) = new_metric.take() {
return func(entry.insert(metric));
} else {
drop(write_guard);
new_metric = Some((self.metric_factory)(labels));
}
},
}
}
}
}
impl<LS, M: TypedMetric, S> TypedMetric for Family<LS, M, S> {
const TYPE: MetricType = <M as TypedMetric>::TYPE;
}
impl<LS: LabelSetSchema, M, S> MetricLabelSet for Family<LS, M, S> {
type LabelSet = LS;
}
impl<LS, M, S> EncodeMetric for Family<LS, M, S>
where
LS: EncodeLabelSet + Send + Sync,
M: EncodeMetric,
S: Send + Sync,
{
fn encode(&self, encoder: &mut dyn MetricEncoder) -> Result<()> {
let guard = self.read();
for (labels, metric) in guard.iter() {
encoder.encode(labels, metric)?;
}
Ok(())
}
fn is_empty(&self) -> bool {
self.read().is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
encoder::{EncodeLabelSet, EncodeLabelValue, LabelEncoder, LabelSetEncoder},
metrics::{
check_text_encoding,
counter::{Counter, LazyCounter},
histogram::{Histogram, exponential_buckets},
},
};
#[derive(Clone, PartialEq, Eq, Hash)]
struct Labels {
method: Method,
status: u16,
error: Option<bool>,
}
#[derive(Clone, PartialEq, Eq, Hash)]
enum Method {
Get,
Put,
}
impl LabelSetSchema for Labels {
fn names() -> Option<&'static [&'static str]> {
Some(&["method", "status", "error"])
}
}
impl EncodeLabelSet for Labels {
fn encode(&self, encoder: &mut dyn LabelSetEncoder) -> Result<()> {
encoder.encode(&("method", &self.method))?;
encoder.encode(&("status", self.status))?;
encoder.encode(&("error", self.error))?;
Ok(())
}
}
impl EncodeLabelValue for Method {
fn encode(&self, encoder: &mut dyn LabelEncoder) -> Result<()> {
match self {
Self::Get => encoder.encode_str_value("GET"),
Self::Put => encoder.encode_str_value("PUT"),
}
}
}
#[test]
fn test_metric_family() {
check_text_encoding(
|registry| {
let http_requests = Family::<Labels, Counter>::default();
registry
.register("http_requests", "Total HTTP requests", http_requests.clone())
.unwrap();
let labels = Labels { method: Method::Get, status: 200, error: None };
http_requests.with_or_new(&labels, |metric| metric.inc());
let labels = Labels { method: Method::Get, status: 404, error: Some(true) };
http_requests.with_or_new(&labels, |metric| metric.inc());
let labels = Labels { method: Method::Put, status: 200, error: None };
http_requests.with_or_new(&labels, |metric| metric.inc());
},
|output| {
assert!(output.contains(r#"http_requests_total{method="GET",status="200"} 1"#));
assert!(
output.contains(
r#"http_requests_total{method="GET",status="404",error="true"} 1"#
)
);
assert!(output.contains(r#"http_requests_total{method="PUT",status="200"} 1"#));
},
);
check_text_encoding(
|registry| {
let http_requests_duration_seconds = Family::<Labels, Histogram>::new(|| {
Histogram::new(exponential_buckets(0.005, 2.0, 10))
});
registry
.register(
"http_requests_duration_seconds",
"Duration of HTTP requests",
http_requests_duration_seconds.clone(),
)
.unwrap();
let labels = Labels { method: Method::Get, status: 200, error: None };
http_requests_duration_seconds.with_or_new(&labels, |hist| hist.observe(0.1));
let labels = Labels { method: Method::Get, status: 404, error: Some(true) };
http_requests_duration_seconds.with_or_new(&labels, |hist| hist.observe(0.1));
let labels = Labels { method: Method::Put, status: 200, error: None };
http_requests_duration_seconds.with_or_new(&labels, |hist| hist.observe(2.0));
},
|output| {
assert!(output.contains(
r#"http_requests_duration_seconds_count{method="GET",status="200"} 1"#
));
assert!(output.contains(
r#"http_requests_duration_seconds_sum{method="GET",status="200"} 0.1"#
));
assert!(output.contains(
r#"http_requests_duration_seconds_count{method="GET",status="404",error="true"} 1"#
));
assert!(output.contains(
r#"http_requests_duration_seconds_sum{method="GET",status="404",error="true"} 0.1"#
));
assert!(output.contains(
r#"http_requests_duration_seconds_count{method="PUT",status="200"} 1"#
));
assert!(output.contains(
r#"http_requests_duration_seconds_sum{method="PUT",status="200"} 2.0"#
));
},
);
}
#[test]
fn test_empty_metric_family() {
check_text_encoding(
|registry| {
let http_requests = Family::<Labels, Counter>::default();
registry
.register("http_requests", "Total HTTP requests", http_requests)
.unwrap();
},
|output| {
assert_eq!(output, "# EOF\n");
},
);
check_text_encoding(
|registry| {
let http_requests = Family::<Labels, Counter>::default();
registry
.register("http_requests", "Total HTTP requests", http_requests.clone())
.unwrap();
let labels = Labels { method: Method::Get, status: 200, error: None };
http_requests.with_or_new(&labels, |_| {});
},
|output| {
assert!(output.contains("# TYPE http_requests counter"));
assert!(output.contains("# HELP http_requests Total HTTP requests"));
assert!(output.contains(r#"http_requests_total{method="GET",status="200"} 0"#));
},
);
}
#[test]
fn test_new_uses_label_aware_factory() {
let family = Family::<Labels, LazyCounter<u64>>::new_with_labels(|labels| {
let method = labels.method.clone();
let status = u64::from(labels.status);
let error = labels.error.unwrap_or(false);
LazyCounter::new(move || {
let method_value = match method {
Method::Get => 1_000_u64,
Method::Put => 2_000_u64,
};
let error_value = if error { 10_000_u64 } else { 0_u64 };
method_value + error_value + status
})
});
let labels_get = Labels { method: Method::Get, status: 200, error: None };
let labels_put = Labels { method: Method::Put, status: 404, error: Some(true) };
let get_total = family.with_or_new(&labels_get, |counter| counter.fetch());
assert_eq!(get_total, 1_200_u64);
let get_total_reused = family.with_or_new(&labels_get, |counter| counter.fetch());
assert_eq!(get_total_reused, 1_200_u64);
let put_total = family.with_or_new(&labels_put, |counter| counter.fetch());
assert_eq!(put_total, 12_404_u64);
assert_eq!(family.with(&labels_get, |counter| counter.fetch()), Some(1_200_u64));
}
}